Skip to main content

reifydb_routine/function/text/
concat.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4use reifydb_core::value::column::{Column, columns::Columns, data::ColumnData};
5use reifydb_type::{
6	util::bitvec::BitVec,
7	value::{constraint::bytes::MaxBytes, container::utf8::Utf8Container, r#type::Type},
8};
9
10use crate::function::{Function, FunctionCapability, FunctionContext, FunctionInfo, error::FunctionError};
11
12pub struct TextConcat {
13	info: FunctionInfo,
14}
15
16impl Default for TextConcat {
17	fn default() -> Self {
18		Self::new()
19	}
20}
21
22impl TextConcat {
23	pub fn new() -> Self {
24		Self {
25			info: FunctionInfo::new("text::concat"),
26		}
27	}
28}
29
30impl Function for TextConcat {
31	fn info(&self) -> &FunctionInfo {
32		&self.info
33	}
34
35	fn capabilities(&self) -> &[FunctionCapability] {
36		&[FunctionCapability::Scalar]
37	}
38
39	fn return_type(&self, _input_types: &[Type]) -> Type {
40		Type::Utf8
41	}
42
43	fn execute(&self, ctx: &FunctionContext, args: &Columns) -> Result<Columns, FunctionError> {
44		if args.len() < 2 {
45			return Err(FunctionError::ArityMismatch {
46				function: ctx.fragment.clone(),
47				expected: 2,
48				actual: args.len(),
49			});
50		}
51
52		// Unwrap options for each column individually
53		let mut unwrapped: Vec<(&ColumnData, Option<&BitVec>)> = Vec::with_capacity(args.len());
54		for col in args.iter() {
55			unwrapped.push(col.data().unwrap_option());
56		}
57
58		let row_count = unwrapped[0].0.len();
59
60		// Validate all arguments are Utf8
61		for (idx, (data, _)) in unwrapped.iter().enumerate() {
62			match data {
63				ColumnData::Utf8 {
64					..
65				} => {}
66				other => {
67					return Err(FunctionError::InvalidArgumentType {
68						function: ctx.fragment.clone(),
69						argument_index: idx,
70						expected: vec![Type::Utf8],
71						actual: other.get_type(),
72					});
73				}
74			}
75		}
76
77		let mut result_data = Vec::with_capacity(row_count);
78
79		for i in 0..row_count {
80			let mut all_defined = true;
81			let mut concatenated = String::new();
82
83			for (data, _) in unwrapped.iter() {
84				if let ColumnData::Utf8 {
85					container,
86					..
87				} = data
88				{
89					if container.is_defined(i) {
90						concatenated.push_str(&container[i]);
91					} else {
92						all_defined = false;
93						break;
94					}
95				}
96			}
97
98			if all_defined {
99				result_data.push(concatenated);
100			} else {
101				result_data.push(String::new());
102			}
103		}
104
105		let result_col_data = ColumnData::Utf8 {
106			container: Utf8Container::new(result_data),
107			max_bytes: MaxBytes::MAX,
108		};
109
110		// Combine all bitvecs
111		let mut combined_bv: Option<BitVec> = None;
112		for (_, bv) in unwrapped.iter() {
113			if let Some(bv) = bv {
114				combined_bv = Some(match combined_bv {
115					Some(existing) => existing.and(bv),
116					None => (*bv).clone(),
117				});
118			}
119		}
120
121		let final_data = match combined_bv {
122			Some(bv) => ColumnData::Option {
123				inner: Box::new(result_col_data),
124				bitvec: bv,
125			},
126			None => result_col_data,
127		};
128		Ok(Columns::new(vec![Column::new(ctx.fragment.clone(), final_data)]))
129	}
130}