1use reifydb_core::value::column::data::ColumnData;
5use reifydb_type::value::{constraint::bytes::MaxBytes, container::utf8::Utf8Container, r#type::Type};
6
7use crate::{ScalarFunction, ScalarFunctionContext, error::ScalarFunctionError, propagate_options};
8
9const IEC_UNITS: [&str; 6] = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"];
10
11pub(super) fn format_bytes_internal(bytes: i64, base: f64, units: &[&str]) -> String {
12 if bytes == 0 {
13 return "0 B".to_string();
14 }
15
16 let bytes_abs = bytes.unsigned_abs() as f64;
17 let sign = if bytes < 0 {
18 "-"
19 } else {
20 ""
21 };
22
23 let mut unit_index = 0;
24 let mut value = bytes_abs;
25
26 while value >= base && unit_index < units.len() - 1 {
27 value /= base;
28 unit_index += 1;
29 }
30
31 if unit_index == 0 {
32 format!("{}{} {}", sign, bytes_abs as i64, units[0])
33 } else if value == value.floor() {
34 format!("{}{} {}", sign, value as i64, units[unit_index])
35 } else {
36 let formatted = format!("{:.2}", value);
37 let trimmed = formatted.trim_end_matches('0').trim_end_matches('.');
38 format!("{}{} {}", sign, trimmed, units[unit_index])
39 }
40}
41
42#[macro_export]
43macro_rules! process_int_column {
44 ($container:expr, $row_count:expr, $base:expr, $units:expr) => {{
45 let mut result_data = Vec::with_capacity($row_count);
46
47 for i in 0..$row_count {
48 if let Some(&value) = $container.get(i) {
49 result_data.push(format_bytes_internal(value as i64, $base, $units));
50 } else {
51 result_data.push(String::new());
52 }
53 }
54
55 Ok(ColumnData::Utf8 {
56 container: Utf8Container::new(result_data),
57 max_bytes: MaxBytes::MAX,
58 })
59 }};
60}
61
62#[macro_export]
63macro_rules! process_float_column {
64 ($container:expr, $row_count:expr, $base:expr, $units:expr) => {{
65 let mut result_data = Vec::with_capacity($row_count);
66
67 for i in 0..$row_count {
68 if let Some(&value) = $container.get(i) {
69 result_data.push(format_bytes_internal(value as i64, $base, $units));
70 } else {
71 result_data.push(String::new());
72 }
73 }
74
75 Ok(ColumnData::Utf8 {
76 container: Utf8Container::new(result_data),
77 max_bytes: MaxBytes::MAX,
78 })
79 }};
80}
81
82#[macro_export]
83macro_rules! process_decimal_column {
84 ($container:expr, $row_count:expr, $base:expr, $units:expr) => {{
85 let mut result_data = Vec::with_capacity($row_count);
86
87 for i in 0..$row_count {
88 if let Some(value) = $container.get(i) {
89 let s = value.to_string();
91 let int_part = s.split('.').next().unwrap_or("0");
92 let bytes = int_part.parse::<i64>().unwrap_or(0);
93 result_data.push(format_bytes_internal(bytes, $base, $units));
94 } else {
95 result_data.push(String::new());
96 }
97 }
98
99 Ok(ColumnData::Utf8 {
100 container: Utf8Container::new(result_data),
101 max_bytes: MaxBytes::MAX,
102 })
103 }};
104}
105
106pub struct FormatBytes;
108
109impl FormatBytes {
110 pub fn new() -> Self {
111 Self
112 }
113}
114
115impl ScalarFunction for FormatBytes {
116 fn scalar(&self, ctx: ScalarFunctionContext) -> crate::error::ScalarFunctionResult<ColumnData> {
117 if let Some(result) = propagate_options(self, &ctx) {
118 return result;
119 }
120
121 let columns = ctx.columns;
122 let row_count = ctx.row_count;
123
124 if columns.len() != 1 {
125 return Err(ScalarFunctionError::ArityMismatch {
126 function: ctx.fragment.clone(),
127 expected: 1,
128 actual: columns.len(),
129 });
130 }
131
132 let column = columns.get(0).unwrap();
133
134 match &column.data() {
135 ColumnData::Int1(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
136 ColumnData::Int2(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
137 ColumnData::Int4(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
138 ColumnData::Int8(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
139 ColumnData::Uint1(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
140 ColumnData::Uint2(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
141 ColumnData::Uint4(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
142 ColumnData::Uint8(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
143 ColumnData::Float4(container) => {
144 process_float_column!(container, row_count, 1024.0, &IEC_UNITS)
145 }
146 ColumnData::Float8(container) => {
147 process_float_column!(container, row_count, 1024.0, &IEC_UNITS)
148 }
149 ColumnData::Decimal {
150 container,
151 ..
152 } => {
153 process_decimal_column!(container, row_count, 1024.0, &IEC_UNITS)
154 }
155 other => Err(ScalarFunctionError::InvalidArgumentType {
156 function: ctx.fragment.clone(),
157 argument_index: 0,
158 expected: vec![
159 Type::Int1,
160 Type::Int2,
161 Type::Int4,
162 Type::Int8,
163 Type::Uint1,
164 Type::Uint2,
165 Type::Uint4,
166 Type::Uint8,
167 Type::Float4,
168 Type::Float8,
169 Type::Decimal,
170 ],
171 actual: other.get_type(),
172 }),
173 }
174 }
175
176 fn return_type(&self, _input_types: &[Type]) -> Type {
177 Type::Utf8
178 }
179}
180
181pub(super) use process_decimal_column;
182pub(super) use process_float_column;
183pub(super) use process_int_column;