reifydb_routine/function/text/
concat.rs1use reifydb_core::value::column::{ColumnWithName, buffer::ColumnBuffer, columns::Columns};
5use reifydb_type::{
6 util::bitvec::BitVec,
7 value::{constraint::bytes::MaxBytes, container::utf8::Utf8Container, r#type::Type},
8};
9
10use crate::routine::{Function, FunctionKind, Routine, RoutineInfo, context::FunctionContext, error::RoutineError};
11
12pub struct TextConcat {
13 info: RoutineInfo,
14}
15
16impl Default for TextConcat {
17 fn default() -> Self {
18 Self::new()
19 }
20}
21
22impl TextConcat {
23 pub fn new() -> Self {
24 Self {
25 info: RoutineInfo::new("text::concat"),
26 }
27 }
28}
29
30impl<'a> Routine<FunctionContext<'a>> for TextConcat {
31 fn info(&self) -> &RoutineInfo {
32 &self.info
33 }
34
35 fn return_type(&self, _input_types: &[Type]) -> Type {
36 Type::Utf8
37 }
38
39 fn execute(&self, ctx: &mut FunctionContext<'a>, args: &Columns) -> Result<Columns, RoutineError> {
40 if args.len() < 2 {
41 return Err(RoutineError::FunctionArityMismatch {
42 function: ctx.fragment.clone(),
43 expected: 2,
44 actual: args.len(),
45 });
46 }
47
48 let mut unwrapped: Vec<(&ColumnBuffer, Option<&BitVec>)> = Vec::with_capacity(args.len());
49 for col in args.iter() {
50 unwrapped.push(col.data().unwrap_option());
51 }
52
53 let row_count = unwrapped[0].0.len();
54
55 for (idx, (data, _)) in unwrapped.iter().enumerate() {
56 match data {
57 ColumnBuffer::Utf8 {
58 ..
59 } => {}
60 other => {
61 return Err(RoutineError::FunctionInvalidArgumentType {
62 function: ctx.fragment.clone(),
63 argument_index: idx,
64 expected: vec![Type::Utf8],
65 actual: other.get_type(),
66 });
67 }
68 }
69 }
70
71 let mut result_data = Vec::with_capacity(row_count);
72
73 for i in 0..row_count {
74 let mut all_defined = true;
75 let mut concatenated = String::new();
76
77 for (data, _) in unwrapped.iter() {
78 if let ColumnBuffer::Utf8 {
79 container,
80 ..
81 } = data
82 {
83 if container.is_defined(i) {
84 concatenated.push_str(container.get(i).unwrap());
85 } else {
86 all_defined = false;
87 break;
88 }
89 }
90 }
91
92 if all_defined {
93 result_data.push(concatenated);
94 } else {
95 result_data.push(String::new());
96 }
97 }
98
99 let result_col_data = ColumnBuffer::Utf8 {
100 container: Utf8Container::new(result_data),
101 max_bytes: MaxBytes::MAX,
102 };
103
104 let mut combined_bv: Option<BitVec> = None;
105 for (_, bv) in unwrapped.iter() {
106 if let Some(bv) = bv {
107 combined_bv = Some(match combined_bv {
108 Some(existing) => existing.and(bv),
109 None => (*bv).clone(),
110 });
111 }
112 }
113
114 let final_data = match combined_bv {
115 Some(bv) => ColumnBuffer::Option {
116 inner: Box::new(result_col_data),
117 bitvec: bv,
118 },
119 None => result_col_data,
120 };
121 Ok(Columns::new(vec![ColumnWithName::new(ctx.fragment.clone(), final_data)]))
122 }
123}
124
125impl Function for TextConcat {
126 fn kinds(&self) -> &[FunctionKind] {
127 &[FunctionKind::Scalar]
128 }
129}