1use reifydb_core::value::{column::ColumnData, container::Utf8Container};
5use reifydb_type::value::constraint::bytes::MaxBytes;
6
7use crate::function::{ScalarFunction, ScalarFunctionContext};
8
9const IEC_UNITS: [&str; 6] = ["B", "KiB", "MiB", "GiB", "TiB", "PiB"];
10const SI_UNITS: [&str; 6] = ["B", "KB", "MB", "GB", "TB", "PB"];
11
12fn format_bytes_internal(bytes: i64, base: f64, units: &[&str]) -> String {
13 if bytes == 0 {
14 return "0 B".to_string();
15 }
16
17 let bytes_abs = bytes.unsigned_abs() as f64;
18 let sign = if bytes < 0 {
19 "-"
20 } else {
21 ""
22 };
23
24 let mut unit_index = 0;
25 let mut value = bytes_abs;
26
27 while value >= base && unit_index < units.len() - 1 {
28 value /= base;
29 unit_index += 1;
30 }
31
32 if unit_index == 0 {
33 format!("{}{} {}", sign, bytes_abs as i64, units[0])
34 } else if value == value.floor() {
35 format!("{}{} {}", sign, value as i64, units[unit_index])
36 } else {
37 let formatted = format!("{:.2}", value);
38 let trimmed = formatted.trim_end_matches('0').trim_end_matches('.');
39 format!("{}{} {}", sign, trimmed, units[unit_index])
40 }
41}
42
43macro_rules! process_int_column {
44 ($container:expr, $row_count:expr, $base:expr, $units:expr) => {{
45 let mut result_data = Vec::with_capacity($row_count);
46 let mut result_bitvec = Vec::with_capacity($row_count);
47
48 for i in 0..$row_count {
49 if let Some(&value) = $container.get(i) {
50 result_data.push(format_bytes_internal(value as i64, $base, $units));
51 result_bitvec.push(true);
52 } else {
53 result_data.push(String::new());
54 result_bitvec.push(false);
55 }
56 }
57
58 Ok(ColumnData::Utf8 {
59 container: Utf8Container::new(result_data, result_bitvec.into()),
60 max_bytes: MaxBytes::MAX,
61 })
62 }};
63}
64
65macro_rules! process_float_column {
66 ($container:expr, $row_count:expr, $base:expr, $units:expr) => {{
67 let mut result_data = Vec::with_capacity($row_count);
68 let mut result_bitvec = Vec::with_capacity($row_count);
69
70 for i in 0..$row_count {
71 if let Some(&value) = $container.get(i) {
72 result_data.push(format_bytes_internal(value as i64, $base, $units));
73 result_bitvec.push(true);
74 } else {
75 result_data.push(String::new());
76 result_bitvec.push(false);
77 }
78 }
79
80 Ok(ColumnData::Utf8 {
81 container: Utf8Container::new(result_data, result_bitvec.into()),
82 max_bytes: MaxBytes::MAX,
83 })
84 }};
85}
86
87macro_rules! process_decimal_column {
88 ($container:expr, $row_count:expr, $base:expr, $units:expr) => {{
89 let mut result_data = Vec::with_capacity($row_count);
90 let mut result_bitvec = Vec::with_capacity($row_count);
91
92 for i in 0..$row_count {
93 if let Some(value) = $container.get(i) {
94 let s = value.to_string();
96 let int_part = s.split('.').next().unwrap_or("0");
97 let bytes = int_part.parse::<i64>().unwrap_or(0);
98 result_data.push(format_bytes_internal(bytes, $base, $units));
99 result_bitvec.push(true);
100 } else {
101 result_data.push(String::new());
102 result_bitvec.push(false);
103 }
104 }
105
106 Ok(ColumnData::Utf8 {
107 container: Utf8Container::new(result_data, result_bitvec.into()),
108 max_bytes: MaxBytes::MAX,
109 })
110 }};
111}
112
113pub struct FormatBytes;
115
116impl FormatBytes {
117 pub fn new() -> Self {
118 Self
119 }
120}
121
122impl ScalarFunction for FormatBytes {
123 fn scalar(&self, ctx: ScalarFunctionContext) -> crate::Result<ColumnData> {
124 let columns = ctx.columns;
125 let row_count = ctx.row_count;
126
127 if columns.is_empty() {
128 return Ok(ColumnData::utf8(Vec::<String>::new()));
129 }
130
131 let column = columns.get(0).unwrap();
132
133 match &column.data() {
134 ColumnData::Int1(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
135 ColumnData::Int2(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
136 ColumnData::Int4(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
137 ColumnData::Int8(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
138 ColumnData::Uint1(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
139 ColumnData::Uint2(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
140 ColumnData::Uint4(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
141 ColumnData::Uint8(container) => process_int_column!(container, row_count, 1024.0, &IEC_UNITS),
142 ColumnData::Float4(container) => {
143 process_float_column!(container, row_count, 1024.0, &IEC_UNITS)
144 }
145 ColumnData::Float8(container) => {
146 process_float_column!(container, row_count, 1024.0, &IEC_UNITS)
147 }
148 ColumnData::Decimal {
149 container,
150 ..
151 } => {
152 process_decimal_column!(container, row_count, 1024.0, &IEC_UNITS)
153 }
154 _ => unimplemented!("FormatBytes only supports numeric input"),
155 }
156 }
157}
158
159pub struct FormatBytesSi;
161
162impl FormatBytesSi {
163 pub fn new() -> Self {
164 Self
165 }
166}
167
168impl ScalarFunction for FormatBytesSi {
169 fn scalar(&self, ctx: ScalarFunctionContext) -> crate::Result<ColumnData> {
170 let columns = ctx.columns;
171 let row_count = ctx.row_count;
172
173 if columns.is_empty() {
174 return Ok(ColumnData::utf8(Vec::<String>::new()));
175 }
176
177 let column = columns.get(0).unwrap();
178
179 match &column.data() {
180 ColumnData::Int1(container) => process_int_column!(container, row_count, 1000.0, &SI_UNITS),
181 ColumnData::Int2(container) => process_int_column!(container, row_count, 1000.0, &SI_UNITS),
182 ColumnData::Int4(container) => process_int_column!(container, row_count, 1000.0, &SI_UNITS),
183 ColumnData::Int8(container) => process_int_column!(container, row_count, 1000.0, &SI_UNITS),
184 ColumnData::Uint1(container) => process_int_column!(container, row_count, 1000.0, &SI_UNITS),
185 ColumnData::Uint2(container) => process_int_column!(container, row_count, 1000.0, &SI_UNITS),
186 ColumnData::Uint4(container) => process_int_column!(container, row_count, 1000.0, &SI_UNITS),
187 ColumnData::Uint8(container) => process_int_column!(container, row_count, 1000.0, &SI_UNITS),
188 ColumnData::Float4(container) => process_float_column!(container, row_count, 1000.0, &SI_UNITS),
189 ColumnData::Float8(container) => process_float_column!(container, row_count, 1000.0, &SI_UNITS),
190 ColumnData::Decimal {
191 container,
192 ..
193 } => {
194 process_decimal_column!(container, row_count, 1000.0, &SI_UNITS)
195 }
196 _ => unimplemented!("FormatBytesSi only supports numeric input"),
197 }
198 }
199}
200
201#[cfg(test)]
202mod tests {
203 use reifydb_core::value::column::{Column, Columns};
204
205 use super::*;
206
207 #[tokio::test]
208 async fn test_format_bytes_binary_basic() {
209 let function = FormatBytes::new();
210
211 let data = vec![0i64, 512, 1024, 1536, 1048576, 1073741824];
212 let column = Column::int8("bytes", data);
213 let columns = Columns::new(vec![column]);
214 let ctx = ScalarFunctionContext {
215 columns: &columns,
216 row_count: 6,
217 };
218
219 let result = function.scalar(ctx).unwrap();
220
221 let ColumnData::Utf8 {
222 container,
223 ..
224 } = result
225 else {
226 panic!("Expected UTF8 column data");
227 };
228
229 assert_eq!(container[0], "0 B");
230 assert_eq!(container[1], "512 B");
231 assert_eq!(container[2], "1 KiB");
232 assert_eq!(container[3], "1.5 KiB");
233 assert_eq!(container[4], "1 MiB");
234 assert_eq!(container[5], "1 GiB");
235 }
236
237 #[tokio::test]
238 async fn test_format_bytes_si_basic() {
239 let function = FormatBytesSi::new();
240
241 let data = vec![0i64, 500, 1000, 1500, 1000000, 1000000000];
242 let column = Column::int8("bytes", data);
243 let columns = Columns::new(vec![column]);
244 let ctx = ScalarFunctionContext {
245 columns: &columns,
246 row_count: 6,
247 };
248
249 let result = function.scalar(ctx).unwrap();
250
251 let ColumnData::Utf8 {
252 container,
253 ..
254 } = result
255 else {
256 panic!("Expected UTF8 column data");
257 };
258
259 assert_eq!(container[0], "0 B");
260 assert_eq!(container[1], "500 B");
261 assert_eq!(container[2], "1 KB");
262 assert_eq!(container[3], "1.5 KB");
263 assert_eq!(container[4], "1 MB");
264 assert_eq!(container[5], "1 GB");
265 }
266
267 #[tokio::test]
268 async fn test_format_bytes_int4() {
269 let function = FormatBytes::new();
270
271 let data = vec![1024i32, 2048, 1048576];
272 let column = Column::int4("bytes", data);
273 let columns = Columns::new(vec![column]);
274 let ctx = ScalarFunctionContext {
275 columns: &columns,
276 row_count: 3,
277 };
278
279 let result = function.scalar(ctx).unwrap();
280
281 let ColumnData::Utf8 {
282 container,
283 ..
284 } = result
285 else {
286 panic!("Expected UTF8 column data");
287 };
288
289 assert_eq!(container[0], "1 KiB");
290 assert_eq!(container[1], "2 KiB");
291 assert_eq!(container[2], "1 MiB");
292 }
293
294 #[tokio::test]
295 async fn test_format_bytes_with_decimals() {
296 let function = FormatBytes::new();
297
298 let data = vec![1536i64, 2560, 1572864];
299 let column = Column::int8("bytes", data);
300 let columns = Columns::new(vec![column]);
301 let ctx = ScalarFunctionContext {
302 columns: &columns,
303 row_count: 3,
304 };
305
306 let result = function.scalar(ctx).unwrap();
307
308 let ColumnData::Utf8 {
309 container,
310 ..
311 } = result
312 else {
313 panic!("Expected UTF8 column data");
314 };
315
316 assert_eq!(container[0], "1.5 KiB");
317 assert_eq!(container[1], "2.5 KiB");
318 assert_eq!(container[2], "1.5 MiB");
319 }
320
321 #[tokio::test]
322 async fn test_format_bytes_large_values() {
323 let function = FormatBytes::new();
324
325 let data = vec![
326 1099511627776i64, 1125899906842624i64, ];
329 let column = Column::int8("bytes", data);
330 let columns = Columns::new(vec![column]);
331 let ctx = ScalarFunctionContext {
332 columns: &columns,
333 row_count: 2,
334 };
335
336 let result = function.scalar(ctx).unwrap();
337
338 let ColumnData::Utf8 {
339 container,
340 ..
341 } = result
342 else {
343 panic!("Expected UTF8 column data");
344 };
345
346 assert_eq!(container[0], "1 TiB");
347 assert_eq!(container[1], "1 PiB");
348 }
349
350 #[tokio::test]
351 async fn test_format_bytes_with_null() {
352 use reifydb_core::BitVec;
353
354 let function = FormatBytes::new();
355
356 let data = vec![1024i64, 0, 2048];
357 let mut bitvec = BitVec::repeat(3, true);
358 bitvec.set(1, false);
359
360 let column = Column::int8_with_bitvec("bytes", data, bitvec);
361 let columns = Columns::new(vec![column]);
362 let ctx = ScalarFunctionContext {
363 columns: &columns,
364 row_count: 3,
365 };
366
367 let result = function.scalar(ctx).unwrap();
368
369 let ColumnData::Utf8 {
370 container,
371 ..
372 } = result
373 else {
374 panic!("Expected UTF8 column data");
375 };
376
377 assert!(container.is_defined(0));
378 assert!(!container.is_defined(1));
379 assert!(container.is_defined(2));
380
381 assert_eq!(container[0], "1 KiB");
382 assert_eq!(container[2], "2 KiB");
383 }
384
385 #[tokio::test]
386 async fn test_format_bytes_uint8() {
387 let function = FormatBytes::new();
388
389 let data = vec![0u64, 1024, 1048576, 1073741824];
390 let column = Column::uint8("bytes", data);
391 let columns = Columns::new(vec![column]);
392 let ctx = ScalarFunctionContext {
393 columns: &columns,
394 row_count: 4,
395 };
396
397 let result = function.scalar(ctx).unwrap();
398
399 let ColumnData::Utf8 {
400 container,
401 ..
402 } = result
403 else {
404 panic!("Expected UTF8 column data");
405 };
406
407 assert_eq!(container[0], "0 B");
408 assert_eq!(container[1], "1 KiB");
409 assert_eq!(container[2], "1 MiB");
410 assert_eq!(container[3], "1 GiB");
411 }
412
413 #[tokio::test]
414 async fn test_format_bytes_uint4() {
415 let function = FormatBytes::new();
416
417 let data = vec![512u32, 1024, 2048];
418 let column = Column::uint4("bytes", data);
419 let columns = Columns::new(vec![column]);
420 let ctx = ScalarFunctionContext {
421 columns: &columns,
422 row_count: 3,
423 };
424
425 let result = function.scalar(ctx).unwrap();
426
427 let ColumnData::Utf8 {
428 container,
429 ..
430 } = result
431 else {
432 panic!("Expected UTF8 column data");
433 };
434
435 assert_eq!(container[0], "512 B");
436 assert_eq!(container[1], "1 KiB");
437 assert_eq!(container[2], "2 KiB");
438 }
439
440 #[tokio::test]
441 async fn test_format_bytes_float8() {
442 let function = FormatBytes::new();
443
444 let data = vec![1024.5f64, 1048576.0, 1572864.0];
445 let column = Column::float8("bytes", data);
446 let columns = Columns::new(vec![column]);
447 let ctx = ScalarFunctionContext {
448 columns: &columns,
449 row_count: 3,
450 };
451
452 let result = function.scalar(ctx).unwrap();
453
454 let ColumnData::Utf8 {
455 container,
456 ..
457 } = result
458 else {
459 panic!("Expected UTF8 column data");
460 };
461
462 assert_eq!(container[0], "1 KiB");
464 assert_eq!(container[1], "1 MiB");
465 assert_eq!(container[2], "1.5 MiB");
466 }
467
468 #[tokio::test]
469 async fn test_format_bytes_float4() {
470 let function = FormatBytes::new();
471
472 let data = vec![512.9f32, 1024.0, 2048.5];
473 let column = Column::float4("bytes", data);
474 let columns = Columns::new(vec![column]);
475 let ctx = ScalarFunctionContext {
476 columns: &columns,
477 row_count: 3,
478 };
479
480 let result = function.scalar(ctx).unwrap();
481
482 let ColumnData::Utf8 {
483 container,
484 ..
485 } = result
486 else {
487 panic!("Expected UTF8 column data");
488 };
489
490 assert_eq!(container[0], "512 B");
492 assert_eq!(container[1], "1 KiB");
493 assert_eq!(container[2], "2 KiB");
495 }
496
497 #[tokio::test]
498 async fn test_format_bytes_decimal() {
499 use std::str::FromStr;
500
501 use reifydb_type::Decimal;
502
503 let function = FormatBytes::new();
504
505 let data = vec![
506 Decimal::from_str("1024").unwrap(),
507 Decimal::from_str("1048576.5").unwrap(),
508 Decimal::from_str("1572864").unwrap(),
509 ];
510 let column = Column::new("bytes", ColumnData::decimal(data));
511 let columns = Columns::new(vec![column]);
512 let ctx = ScalarFunctionContext {
513 columns: &columns,
514 row_count: 3,
515 };
516
517 let result = function.scalar(ctx).unwrap();
518
519 let ColumnData::Utf8 {
520 container,
521 ..
522 } = result
523 else {
524 panic!("Expected UTF8 column data");
525 };
526
527 assert_eq!(container[0], "1 KiB");
528 assert_eq!(container[1], "1 MiB");
530 assert_eq!(container[2], "1.5 MiB");
531 }
532}