1use crate::arrow_store::{ArrowSheet, IngestBuilder};
2use crate::engine::Engine;
3use crate::traits::EvaluationContext;
4use chrono::Timelike;
5use formualizer_common::{ExcelError, LiteralValue};
6use rustc_hash::FxHashMap;
7
8#[derive(Debug, Clone, Default)]
9pub struct ArrowBulkIngestSummary {
10 pub sheets: usize,
11 pub total_rows: usize,
12}
13
14pub struct ArrowBulkIngestBuilder<'e, R: EvaluationContext> {
16 engine: &'e mut Engine<R>,
17 builders: FxHashMap<String, IngestBuilder>,
18 rows: FxHashMap<String, usize>,
19}
20
21impl<'e, R: EvaluationContext> ArrowBulkIngestBuilder<'e, R> {
22 pub fn new(engine: &'e mut Engine<R>) -> Self {
23 Self {
24 engine,
25 builders: FxHashMap::default(),
26 rows: FxHashMap::default(),
27 }
28 }
29
30 pub fn add_sheet(&mut self, name: &str, ncols: usize, chunk_rows: usize) {
32 let ib = IngestBuilder::new(name, ncols, chunk_rows, self.engine.config.date_system);
33 self.builders.insert(name.to_string(), ib);
34 self.rows.insert(name.to_string(), 0);
35 self.engine.sheet_id_mut(name);
36 }
37
38 pub fn append_row(&mut self, name: &str, row: &[LiteralValue]) -> Result<(), ExcelError> {
40 let ib = self
41 .builders
42 .get_mut(name)
43 .expect("sheet must be added before append_row");
44 ib.append_row(row)?;
45 *self.rows.get_mut(name).unwrap() += 1;
46 Ok(())
47 }
48
49 pub fn finish(mut self) -> Result<ArrowBulkIngestSummary, ExcelError> {
51 let mut sheets: Vec<(String, ArrowSheet)> = Vec::with_capacity(self.builders.len());
52 for (name, builder) in self.builders.drain() {
53 let sheet = builder.finish();
54 sheets.push((name, sheet));
55 }
56 for (name, sheet) in sheets {
58 let store = self.engine.sheet_store_mut();
59 if let Some(pos) = store.sheets.iter().position(|s| s.name.as_ref() == name) {
60 store.sheets[pos] = sheet;
61 } else {
62 store.sheets.push(sheet);
63 }
64 }
65 let total_rows = self.rows.values().copied().sum();
66 Ok(ArrowBulkIngestSummary {
67 sheets: self.rows.len(),
68 total_rows,
69 })
70 }
71}
72
73pub struct ArrowBulkUpdateBuilder<'e, R: EvaluationContext> {
75 engine: &'e mut Engine<R>,
76 updates: FxHashMap<String, FxHashMap<usize, FxHashMap<usize, LiteralValue>>>,
78}
79
80impl<'e, R: EvaluationContext> ArrowBulkUpdateBuilder<'e, R> {
81 pub fn new(engine: &'e mut Engine<R>) -> Self {
82 Self {
83 engine,
84 updates: FxHashMap::default(),
85 }
86 }
87
88 pub fn update_cell(&mut self, sheet: &str, row: u32, col: u32, value: LiteralValue) {
89 let s = self.updates.entry(sheet.to_string()).or_default();
90 let c = s.entry(col.saturating_sub(1) as usize).or_default();
91 c.insert(row.saturating_sub(1) as usize, value);
92 }
93
94 pub fn finish(mut self) -> Result<usize, ExcelError> {
95 use std::sync::Arc;
96 let date_system = self.engine.config.date_system;
97 let mut total = 0usize;
98 for (sheet_name, by_col) in self.updates.drain() {
99 let maybe_sheet = self.engine.sheet_store_mut().sheet_mut(&sheet_name);
100 if maybe_sheet.is_none() {
101 continue;
102 }
103 let sheet = maybe_sheet.unwrap();
104 for (col0, rows_map) in by_col {
105 total += rows_map.len();
106 if col0 >= sheet.columns.len() {
107 continue;
108 }
109 let mut by_chunk: FxHashMap<usize, Vec<(usize, LiteralValue)>> =
111 FxHashMap::default();
112 for (row0, v) in rows_map {
113 if row0 >= sheet.nrows as usize {
114 sheet.ensure_row_capacity(row0 + 1);
115 }
116 if let Some((ch_idx, in_off)) = sheet.chunk_of_row(row0) {
117 by_chunk.entry(ch_idx).or_default().push((in_off, v));
118 }
119 }
120 for (ch_idx, mut items) in by_chunk {
121 let Some(ch) = sheet.ensure_column_chunk_mut(col0, ch_idx) else {
122 continue;
123 };
124 let len = ch.type_tag.len();
125 let rebuild = items.len() > len / 50 || items.len() > 1024;
127 if !rebuild {
128 for (off, v) in items {
130 let ov = match v {
131 LiteralValue::Empty => crate::arrow_store::OverlayValue::Empty,
132 LiteralValue::Int(i) => {
133 crate::arrow_store::OverlayValue::Number(i as f64)
134 }
135 LiteralValue::Number(n) => {
136 crate::arrow_store::OverlayValue::Number(n)
137 }
138 LiteralValue::Boolean(b) => {
139 crate::arrow_store::OverlayValue::Boolean(b)
140 }
141 LiteralValue::Text(s) => {
142 crate::arrow_store::OverlayValue::Text(Arc::from(s))
143 }
144 LiteralValue::Error(e) => crate::arrow_store::OverlayValue::Error(
145 crate::arrow_store::map_error_code(e.kind),
146 ),
147 LiteralValue::Date(d) => {
148 let dt = d.and_hms_opt(0, 0, 0).unwrap();
149 let serial = crate::builtins::datetime::datetime_to_serial_for(
150 date_system,
151 &dt,
152 );
153 crate::arrow_store::OverlayValue::DateTime(serial)
154 }
155 LiteralValue::DateTime(dt) => {
156 let serial = crate::builtins::datetime::datetime_to_serial_for(
157 date_system,
158 &dt,
159 );
160 crate::arrow_store::OverlayValue::DateTime(serial)
161 }
162 LiteralValue::Time(t) => {
163 let serial = t.num_seconds_from_midnight() as f64 / 86_400.0;
164 crate::arrow_store::OverlayValue::DateTime(serial)
165 }
166 LiteralValue::Duration(d) => {
167 let serial = d.num_seconds() as f64 / 86_400.0;
168 crate::arrow_store::OverlayValue::Duration(serial)
169 }
170 LiteralValue::Pending => crate::arrow_store::OverlayValue::Pending,
171 LiteralValue::Array(_) => crate::arrow_store::OverlayValue::Error(
172 crate::arrow_store::map_error_code(
173 formualizer_common::ExcelErrorKind::Value,
174 ),
175 ),
176 };
177 let _ = ch.overlay.set(off, ov);
178 }
179 } else {
180 use arrow_array::Array as _;
182 use arrow_array::builder::{
183 BooleanBuilder, Float64Builder, StringBuilder, UInt8Builder,
184 };
185 items.sort_by_key(|(o, _)| *o);
186 let mut tag_b = UInt8Builder::with_capacity(len);
187 let mut nb = Float64Builder::with_capacity(len);
188 let mut bb = BooleanBuilder::with_capacity(len);
189 let mut sb = StringBuilder::with_capacity(len, len * 8);
190 let mut eb = UInt8Builder::with_capacity(len);
191 let mut non_num = 0usize;
192 let mut non_bool = 0usize;
193 let mut non_text = 0usize;
194 let mut non_err = 0usize;
195 let mut it = items.into_iter().peekable();
196 for i in 0..len {
197 let upd = if it.peek().map(|(o, _)| *o == i).unwrap_or(false) {
198 Some(it.next().unwrap().1)
199 } else {
200 None
201 };
202 let val = if let Some(v) = upd {
203 v
204 } else {
205 let t = crate::arrow_store::TypeTag::from_u8(ch.type_tag.value(i));
207 match t {
208 crate::arrow_store::TypeTag::Empty => LiteralValue::Empty,
209 crate::arrow_store::TypeTag::Number => {
210 if let Some(a) = &ch.numbers {
211 let fa = a
212 .as_any()
213 .downcast_ref::<arrow_array::Float64Array>()
214 .unwrap();
215 if fa.is_null(i) {
216 LiteralValue::Empty
217 } else {
218 LiteralValue::Number(fa.value(i))
219 }
220 } else {
221 LiteralValue::Empty
222 }
223 }
224 crate::arrow_store::TypeTag::DateTime => {
225 if let Some(a) = &ch.numbers {
226 let fa = a
227 .as_any()
228 .downcast_ref::<arrow_array::Float64Array>()
229 .unwrap();
230 if fa.is_null(i) {
231 LiteralValue::Empty
232 } else {
233 LiteralValue::from_serial_number(fa.value(i))
234 }
235 } else {
236 LiteralValue::Empty
237 }
238 }
239 crate::arrow_store::TypeTag::Duration => {
240 if let Some(a) = &ch.numbers {
241 let fa = a
242 .as_any()
243 .downcast_ref::<arrow_array::Float64Array>()
244 .unwrap();
245 if fa.is_null(i) {
246 LiteralValue::Empty
247 } else {
248 let serial = fa.value(i);
249 let nanos_f = serial * 86_400.0 * 1_000_000_000.0;
250 let nanos = nanos_f
251 .round()
252 .clamp(i64::MIN as f64, i64::MAX as f64)
253 as i64;
254 LiteralValue::Duration(
255 chrono::Duration::nanoseconds(nanos),
256 )
257 }
258 } else {
259 LiteralValue::Empty
260 }
261 }
262 crate::arrow_store::TypeTag::Boolean => {
263 if let Some(a) = &ch.booleans {
264 let ba = a
265 .as_any()
266 .downcast_ref::<arrow_array::BooleanArray>()
267 .unwrap();
268 if ba.is_null(i) {
269 LiteralValue::Empty
270 } else {
271 LiteralValue::Boolean(ba.value(i))
272 }
273 } else {
274 LiteralValue::Empty
275 }
276 }
277 crate::arrow_store::TypeTag::Text => {
278 if let Some(a) = &ch.text {
279 let sa = a
280 .as_any()
281 .downcast_ref::<arrow_array::StringArray>()
282 .unwrap();
283 if sa.is_null(i) {
284 LiteralValue::Empty
285 } else {
286 LiteralValue::Text(sa.value(i).to_string())
287 }
288 } else {
289 LiteralValue::Empty
290 }
291 }
292 crate::arrow_store::TypeTag::Error => {
293 if let Some(a) = &ch.errors {
294 let ea = a
295 .as_any()
296 .downcast_ref::<arrow_array::UInt8Array>()
297 .unwrap();
298 if ea.is_null(i) {
299 LiteralValue::Empty
300 } else {
301 LiteralValue::Error(ExcelError::new(
302 crate::arrow_store::unmap_error_code(
303 ea.value(i),
304 ),
305 ))
306 }
307 } else {
308 LiteralValue::Empty
309 }
310 }
311 crate::arrow_store::TypeTag::Pending => LiteralValue::Pending,
312 }
313 };
314 match val {
315 LiteralValue::Empty => {
316 tag_b.append_value(crate::arrow_store::TypeTag::Empty as u8);
317 nb.append_null();
318 bb.append_null();
319 sb.append_null();
320 eb.append_null();
321 }
322 LiteralValue::Int(i) => {
323 tag_b.append_value(crate::arrow_store::TypeTag::Number as u8);
324 nb.append_value(i as f64);
325 non_num += 1;
326 bb.append_null();
327 sb.append_null();
328 eb.append_null();
329 }
330 LiteralValue::Number(n) => {
331 tag_b.append_value(crate::arrow_store::TypeTag::Number as u8);
332 nb.append_value(n);
333 non_num += 1;
334 bb.append_null();
335 sb.append_null();
336 eb.append_null();
337 }
338 LiteralValue::Boolean(b) => {
339 tag_b.append_value(crate::arrow_store::TypeTag::Boolean as u8);
340 nb.append_null();
341 bb.append_value(b);
342 non_bool += 1;
343 sb.append_null();
344 eb.append_null();
345 }
346 LiteralValue::Text(s) => {
347 tag_b.append_value(crate::arrow_store::TypeTag::Text as u8);
348 nb.append_null();
349 bb.append_null();
350 sb.append_value(&s);
351 non_text += 1;
352 eb.append_null();
353 }
354 LiteralValue::Error(e) => {
355 tag_b.append_value(crate::arrow_store::TypeTag::Error as u8);
356 nb.append_null();
357 bb.append_null();
358 sb.append_null();
359 eb.append_value(crate::arrow_store::map_error_code(e.kind));
360 non_err += 1;
361 }
362 LiteralValue::Date(d) => {
363 tag_b.append_value(crate::arrow_store::TypeTag::Number as u8);
364 let dt = d.and_hms_opt(0, 0, 0).unwrap();
365 let serial = crate::builtins::datetime::datetime_to_serial_for(
366 date_system,
367 &dt,
368 );
369 nb.append_value(serial);
370 non_num += 1;
371 bb.append_null();
372 sb.append_null();
373 eb.append_null();
374 }
375 LiteralValue::DateTime(dt) => {
376 tag_b.append_value(crate::arrow_store::TypeTag::Number as u8);
377 let serial = crate::builtins::datetime::datetime_to_serial_for(
378 date_system,
379 &dt,
380 );
381 nb.append_value(serial);
382 non_num += 1;
383 bb.append_null();
384 sb.append_null();
385 eb.append_null();
386 }
387 LiteralValue::Time(t) => {
388 tag_b.append_value(crate::arrow_store::TypeTag::Number as u8);
389 let serial = t.num_seconds_from_midnight() as f64 / 86_400.0;
390 nb.append_value(serial);
391 non_num += 1;
392 bb.append_null();
393 sb.append_null();
394 eb.append_null();
395 }
396 LiteralValue::Duration(d) => {
397 tag_b.append_value(crate::arrow_store::TypeTag::Number as u8);
398 let serial = d.num_seconds() as f64 / 86_400.0;
399 nb.append_value(serial);
400 non_num += 1;
401 bb.append_null();
402 sb.append_null();
403 eb.append_null();
404 }
405 LiteralValue::Pending => {
406 tag_b.append_value(crate::arrow_store::TypeTag::Pending as u8);
407 nb.append_null();
408 bb.append_null();
409 sb.append_null();
410 eb.append_null();
411 }
412 LiteralValue::Array(_) => {
413 tag_b.append_value(crate::arrow_store::TypeTag::Error as u8);
414 nb.append_null();
415 bb.append_null();
416 sb.append_null();
417 eb.append_value(crate::arrow_store::map_error_code(
418 formualizer_common::ExcelErrorKind::Value,
419 ));
420 non_err += 1;
421 }
422 }
423 }
424 ch.type_tag = Arc::new(tag_b.finish());
425 ch.numbers = if non_num == 0 {
426 None
427 } else {
428 Some(Arc::new(nb.finish()))
429 };
430 ch.booleans = if non_bool == 0 {
431 None
432 } else {
433 Some(Arc::new(bb.finish()))
434 };
435 ch.text = if non_text == 0 {
436 None
437 } else {
438 Some(Arc::new(sb.finish()))
439 };
440 ch.errors = if non_err == 0 {
441 None
442 } else {
443 Some(Arc::new(eb.finish()))
444 };
445 ch.meta.len = len;
446 ch.meta.non_null_num = non_num;
447 ch.meta.non_null_bool = non_bool;
448 ch.meta.non_null_text = non_text;
449 ch.meta.non_null_err = non_err;
450 let _ = ch.overlay.clear();
451 }
452 }
453 }
454 }
455 self.engine.mark_data_edited();
457 Ok(total)
458 }
459}
460
461#[cfg(test)]
462mod tests {
463 use super::*;
464 use crate::engine::EvalConfig;
465 use crate::test_workbook::TestWorkbook;
466
467 #[test]
468 fn arrow_bulk_ingest_basic() {
469 let mut engine = Engine::new(TestWorkbook::default(), EvalConfig::default());
470 let mut ab = engine.begin_bulk_ingest_arrow();
471 ab.add_sheet("S", 3, 2);
472 ab.append_row(
473 "S",
474 &[
475 LiteralValue::Number(1.0),
476 LiteralValue::Text("a".into()),
477 LiteralValue::Empty,
478 ],
479 )
480 .unwrap();
481 ab.append_row(
482 "S",
483 &[
484 LiteralValue::Boolean(true),
485 LiteralValue::Text("".into()),
486 LiteralValue::Error(formualizer_common::ExcelError::new_value()),
487 ],
488 )
489 .unwrap();
490 let summary = ab.finish().unwrap();
491 assert_eq!(summary.sheets, 1);
492 assert_eq!(summary.total_rows, 2);
493
494 let sheet = engine
495 .sheet_store()
496 .sheet("S")
497 .expect("arrow sheet present");
498 assert_eq!(sheet.columns.len(), 3);
499 assert_eq!(sheet.nrows, 2);
500 for col in &sheet.columns {
502 assert_eq!(col.chunks.len(), 1);
503 assert_eq!(col.chunks[0].len(), 2);
504 }
505 }
506}