reifydb_routine/function/text/
concat.rs1use reifydb_core::value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns};
5use reifydb_type::{
6 util::bitvec::BitVec,
7 value::{constraint::bytes::MaxBytes, container::utf8::Utf8Container, r#type::Type},
8};
9
10use crate::routine::{Function, FunctionKind, Routine, RoutineInfo, context::FunctionContext, error::RoutineError};
11
12pub struct TextConcat {
13 info: RoutineInfo,
14}
15
16impl Default for TextConcat {
17 fn default() -> Self {
18 Self::new()
19 }
20}
21
22impl TextConcat {
23 pub fn new() -> Self {
24 Self {
25 info: RoutineInfo::new("text::concat"),
26 }
27 }
28}
29
30impl<'a> Routine<FunctionContext<'a>> for TextConcat {
31 fn info(&self) -> &RoutineInfo {
32 &self.info
33 }
34
35 fn return_type(&self, _input_types: &[Type]) -> Type {
36 Type::Utf8
37 }
38
39 fn execute(&self, ctx: &mut FunctionContext<'a>, args: &Columns) -> Result<Columns, RoutineError> {
40 if args.len() < 2 {
41 return Err(RoutineError::FunctionArityMismatch {
42 function: ctx.fragment.clone(),
43 expected: 2,
44 actual: args.len(),
45 });
46 }
47
48 let mut unwrapped: Vec<(&ColumnBuffer, Option<&BitVec>)> = Vec::with_capacity(args.len());
50 for col in args.iter() {
51 unwrapped.push(col.data().unwrap_option());
52 }
53
54 let row_count = unwrapped[0].0.len();
55
56 for (idx, (data, _)) in unwrapped.iter().enumerate() {
58 match data {
59 ColumnBuffer::Utf8 {
60 ..
61 } => {}
62 other => {
63 return Err(RoutineError::FunctionInvalidArgumentType {
64 function: ctx.fragment.clone(),
65 argument_index: idx,
66 expected: vec![Type::Utf8],
67 actual: other.get_type(),
68 });
69 }
70 }
71 }
72
73 let mut result_data = Vec::with_capacity(row_count);
74
75 for i in 0..row_count {
76 let mut all_defined = true;
77 let mut concatenated = String::new();
78
79 for (data, _) in unwrapped.iter() {
80 if let ColumnBuffer::Utf8 {
81 container,
82 ..
83 } = data
84 {
85 if container.is_defined(i) {
86 concatenated.push_str(container.get(i).unwrap());
87 } else {
88 all_defined = false;
89 break;
90 }
91 }
92 }
93
94 if all_defined {
95 result_data.push(concatenated);
96 } else {
97 result_data.push(String::new());
98 }
99 }
100
101 let result_col_data = ColumnBuffer::Utf8 {
102 container: Utf8Container::new(result_data),
103 max_bytes: MaxBytes::MAX,
104 };
105
106 let mut combined_bv: Option<BitVec> = None;
108 for (_, bv) in unwrapped.iter() {
109 if let Some(bv) = bv {
110 combined_bv = Some(match combined_bv {
111 Some(existing) => existing.and(bv),
112 None => (*bv).clone(),
113 });
114 }
115 }
116
117 let final_data = match combined_bv {
118 Some(bv) => ColumnBuffer::Option {
119 inner: Box::new(result_col_data),
120 bitvec: bv,
121 },
122 None => result_col_data,
123 };
124 Ok(Columns::new(vec![ColumnWithName::new(ctx.fragment.clone(), final_data)]))
125 }
126}
127
128impl Function for TextConcat {
129 fn kinds(&self) -> &[FunctionKind] {
130 &[FunctionKind::Scalar]
131 }
132}