reifydb_routine/function/text/
concat.rs1use reifydb_core::value::column::{Column, columns::Columns, data::ColumnData};
5use reifydb_type::{
6 util::bitvec::BitVec,
7 value::{constraint::bytes::MaxBytes, container::utf8::Utf8Container, r#type::Type},
8};
9
10use crate::function::{Function, FunctionCapability, FunctionContext, FunctionInfo, error::FunctionError};
11
12pub struct TextConcat {
13 info: FunctionInfo,
14}
15
16impl Default for TextConcat {
17 fn default() -> Self {
18 Self::new()
19 }
20}
21
22impl TextConcat {
23 pub fn new() -> Self {
24 Self {
25 info: FunctionInfo::new("text::concat"),
26 }
27 }
28}
29
30impl Function for TextConcat {
31 fn info(&self) -> &FunctionInfo {
32 &self.info
33 }
34
35 fn capabilities(&self) -> &[FunctionCapability] {
36 &[FunctionCapability::Scalar]
37 }
38
39 fn return_type(&self, _input_types: &[Type]) -> Type {
40 Type::Utf8
41 }
42
43 fn execute(&self, ctx: &FunctionContext, args: &Columns) -> Result<Columns, FunctionError> {
44 if args.len() < 2 {
45 return Err(FunctionError::ArityMismatch {
46 function: ctx.fragment.clone(),
47 expected: 2,
48 actual: args.len(),
49 });
50 }
51
52 let mut unwrapped: Vec<(&ColumnData, Option<&BitVec>)> = Vec::with_capacity(args.len());
54 for col in args.iter() {
55 unwrapped.push(col.data().unwrap_option());
56 }
57
58 let row_count = unwrapped[0].0.len();
59
60 for (idx, (data, _)) in unwrapped.iter().enumerate() {
62 match data {
63 ColumnData::Utf8 {
64 ..
65 } => {}
66 other => {
67 return Err(FunctionError::InvalidArgumentType {
68 function: ctx.fragment.clone(),
69 argument_index: idx,
70 expected: vec![Type::Utf8],
71 actual: other.get_type(),
72 });
73 }
74 }
75 }
76
77 let mut result_data = Vec::with_capacity(row_count);
78
79 for i in 0..row_count {
80 let mut all_defined = true;
81 let mut concatenated = String::new();
82
83 for (data, _) in unwrapped.iter() {
84 if let ColumnData::Utf8 {
85 container,
86 ..
87 } = data
88 {
89 if container.is_defined(i) {
90 concatenated.push_str(&container[i]);
91 } else {
92 all_defined = false;
93 break;
94 }
95 }
96 }
97
98 if all_defined {
99 result_data.push(concatenated);
100 } else {
101 result_data.push(String::new());
102 }
103 }
104
105 let result_col_data = ColumnData::Utf8 {
106 container: Utf8Container::new(result_data),
107 max_bytes: MaxBytes::MAX,
108 };
109
110 let mut combined_bv: Option<BitVec> = None;
112 for (_, bv) in unwrapped.iter() {
113 if let Some(bv) = bv {
114 combined_bv = Some(match combined_bv {
115 Some(existing) => existing.and(bv),
116 None => (*bv).clone(),
117 });
118 }
119 }
120
121 let final_data = match combined_bv {
122 Some(bv) => ColumnData::Option {
123 inner: Box::new(result_col_data),
124 bitvec: bv,
125 },
126 None => result_col_data,
127 };
128 Ok(Columns::new(vec![Column::new(ctx.fragment.clone(), final_data)]))
129 }
130}