1use crate::arrow_store::{ArrowSheet, IngestBuilder};
2use crate::engine::Engine;
3use crate::traits::EvaluationContext;
4use chrono::Timelike;
5use formualizer_common::{ExcelError, LiteralValue};
6use rustc_hash::FxHashMap;
7
8#[derive(Debug, Clone, Default)]
9pub struct ArrowBulkIngestSummary {
10 pub sheets: usize,
11 pub total_rows: usize,
12}
13
14pub struct ArrowBulkIngestBuilder<'e, R: EvaluationContext> {
16 engine: &'e mut Engine<R>,
17 builders: FxHashMap<String, IngestBuilder>,
18 rows: FxHashMap<String, usize>,
19}
20
21impl<'e, R: EvaluationContext> ArrowBulkIngestBuilder<'e, R> {
22 pub fn new(engine: &'e mut Engine<R>) -> Self {
23 Self {
24 engine,
25 builders: FxHashMap::default(),
26 rows: FxHashMap::default(),
27 }
28 }
29
30 pub fn add_sheet(&mut self, name: &str, ncols: usize, chunk_rows: usize) {
32 let ib = IngestBuilder::new(name, ncols, chunk_rows, self.engine.config.date_system);
33 self.builders.insert(name.to_string(), ib);
34 self.rows.insert(name.to_string(), 0);
35 self.engine.graph.sheet_id_mut(name);
37 }
38
39 pub fn append_row(&mut self, name: &str, row: &[LiteralValue]) -> Result<(), ExcelError> {
41 let ib = self
42 .builders
43 .get_mut(name)
44 .expect("sheet must be added before append_row");
45 ib.append_row(row)?;
46 *self.rows.get_mut(name).unwrap() += 1;
47 Ok(())
48 }
49
50 pub fn finish(mut self) -> Result<ArrowBulkIngestSummary, ExcelError> {
52 let mut sheets: Vec<(String, ArrowSheet)> = Vec::with_capacity(self.builders.len());
53 for (name, builder) in self.builders.drain() {
54 let sheet = builder.finish();
55 sheets.push((name, sheet));
56 }
57 for (name, sheet) in sheets {
59 let store = self.engine.sheet_store_mut();
60 if let Some(pos) = store.sheets.iter().position(|s| s.name.as_ref() == name) {
61 store.sheets[pos] = sheet;
62 } else {
63 store.sheets.push(sheet);
64 }
65 }
66 let total_rows = self.rows.values().copied().sum();
67 Ok(ArrowBulkIngestSummary {
68 sheets: self.rows.len(),
69 total_rows,
70 })
71 }
72}
73
74pub struct ArrowBulkUpdateBuilder<'e, R: EvaluationContext> {
76 engine: &'e mut Engine<R>,
77 updates: FxHashMap<String, FxHashMap<usize, FxHashMap<usize, LiteralValue>>>,
79}
80
81impl<'e, R: EvaluationContext> ArrowBulkUpdateBuilder<'e, R> {
82 pub fn new(engine: &'e mut Engine<R>) -> Self {
83 Self {
84 engine,
85 updates: FxHashMap::default(),
86 }
87 }
88
89 pub fn update_cell(&mut self, sheet: &str, row: u32, col: u32, value: LiteralValue) {
90 let s = self.updates.entry(sheet.to_string()).or_default();
91 let c = s.entry(col.saturating_sub(1) as usize).or_default();
92 c.insert(row.saturating_sub(1) as usize, value);
93 }
94
95 pub fn finish(mut self) -> Result<usize, ExcelError> {
96 use std::sync::Arc;
97 let date_system = self.engine.config.date_system;
98 let mut total = 0usize;
99 for (sheet_name, by_col) in self.updates.drain() {
100 let maybe_sheet = self.engine.sheet_store_mut().sheet_mut(&sheet_name);
101 if maybe_sheet.is_none() {
102 continue;
103 }
104 let sheet = maybe_sheet.unwrap();
105 for (col0, rows_map) in by_col {
106 total += rows_map.len();
107 if col0 >= sheet.columns.len() {
108 continue;
109 }
110 let mut by_chunk: FxHashMap<usize, Vec<(usize, LiteralValue)>> =
112 FxHashMap::default();
113 for (row0, v) in rows_map {
114 if row0 >= sheet.nrows as usize {
115 sheet.ensure_row_capacity(row0 + 1);
116 }
117 if let Some((ch_idx, in_off)) = sheet.chunk_of_row(row0) {
118 by_chunk.entry(ch_idx).or_default().push((in_off, v));
119 }
120 }
121 for (ch_idx, mut items) in by_chunk {
122 let ch = &mut sheet.columns[col0].chunks[ch_idx];
123 let len = ch.type_tag.len();
124 let rebuild = items.len() > len / 50 || items.len() > 1024;
126 if !rebuild {
127 for (off, v) in items {
129 let ov = match v {
130 LiteralValue::Empty => crate::arrow_store::OverlayValue::Empty,
131 LiteralValue::Int(i) => {
132 crate::arrow_store::OverlayValue::Number(i as f64)
133 }
134 LiteralValue::Number(n) => {
135 crate::arrow_store::OverlayValue::Number(n)
136 }
137 LiteralValue::Boolean(b) => {
138 crate::arrow_store::OverlayValue::Boolean(b)
139 }
140 LiteralValue::Text(s) => {
141 crate::arrow_store::OverlayValue::Text(Arc::from(s))
142 }
143 LiteralValue::Error(e) => crate::arrow_store::OverlayValue::Error(
144 crate::arrow_store::map_error_code(e.kind),
145 ),
146 LiteralValue::Date(d) => {
147 let dt = d.and_hms_opt(0, 0, 0).unwrap();
148 let serial = crate::builtins::datetime::datetime_to_serial_for(
149 date_system,
150 &dt,
151 );
152 crate::arrow_store::OverlayValue::Number(serial)
153 }
154 LiteralValue::DateTime(dt) => {
155 let serial = crate::builtins::datetime::datetime_to_serial_for(
156 date_system,
157 &dt,
158 );
159 crate::arrow_store::OverlayValue::Number(serial)
160 }
161 LiteralValue::Time(t) => {
162 let serial = t.num_seconds_from_midnight() as f64 / 86_400.0;
163 crate::arrow_store::OverlayValue::Number(serial)
164 }
165 LiteralValue::Duration(d) => {
166 let serial = d.num_seconds() as f64 / 86_400.0;
167 crate::arrow_store::OverlayValue::Number(serial)
168 }
169 LiteralValue::Pending => crate::arrow_store::OverlayValue::Pending,
170 LiteralValue::Array(_) => crate::arrow_store::OverlayValue::Error(
171 crate::arrow_store::map_error_code(
172 formualizer_common::ExcelErrorKind::Value,
173 ),
174 ),
175 };
176 ch.overlay.set(off, ov);
177 }
178 } else {
179 use arrow_array::Array as _;
181 use arrow_array::builder::{
182 BooleanBuilder, Float64Builder, StringBuilder, UInt8Builder,
183 };
184 items.sort_by_key(|(o, _)| *o);
185 let mut tag_b = UInt8Builder::with_capacity(len);
186 let mut nb = Float64Builder::with_capacity(len);
187 let mut bb = BooleanBuilder::with_capacity(len);
188 let mut sb = StringBuilder::with_capacity(len, len * 8);
189 let mut eb = UInt8Builder::with_capacity(len);
190 let mut non_num = 0usize;
191 let mut non_bool = 0usize;
192 let mut non_text = 0usize;
193 let mut non_err = 0usize;
194 let mut it = items.into_iter().peekable();
195 for i in 0..len {
196 let upd = if it.peek().map(|(o, _)| *o == i).unwrap_or(false) {
197 Some(it.next().unwrap().1)
198 } else {
199 None
200 };
201 let val = if let Some(v) = upd {
202 v
203 } else {
204 let t = crate::arrow_store::TypeTag::from_u8(ch.type_tag.value(i));
206 match t {
207 crate::arrow_store::TypeTag::Empty => LiteralValue::Empty,
208 crate::arrow_store::TypeTag::Number
209 | crate::arrow_store::TypeTag::DateTime
210 | crate::arrow_store::TypeTag::Duration => {
211 if let Some(a) = &ch.numbers {
212 let fa = a
213 .as_any()
214 .downcast_ref::<arrow_array::Float64Array>()
215 .unwrap();
216 if fa.is_null(i) {
217 LiteralValue::Empty
218 } else {
219 LiteralValue::Number(fa.value(i))
220 }
221 } else {
222 LiteralValue::Empty
223 }
224 }
225 crate::arrow_store::TypeTag::Boolean => {
226 if let Some(a) = &ch.booleans {
227 let ba = a
228 .as_any()
229 .downcast_ref::<arrow_array::BooleanArray>()
230 .unwrap();
231 if ba.is_null(i) {
232 LiteralValue::Empty
233 } else {
234 LiteralValue::Boolean(ba.value(i))
235 }
236 } else {
237 LiteralValue::Empty
238 }
239 }
240 crate::arrow_store::TypeTag::Text => {
241 if let Some(a) = &ch.text {
242 let sa = a
243 .as_any()
244 .downcast_ref::<arrow_array::StringArray>()
245 .unwrap();
246 if sa.is_null(i) {
247 LiteralValue::Empty
248 } else {
249 LiteralValue::Text(sa.value(i).to_string())
250 }
251 } else {
252 LiteralValue::Empty
253 }
254 }
255 crate::arrow_store::TypeTag::Error => {
256 if let Some(a) = &ch.errors {
257 let ea = a
258 .as_any()
259 .downcast_ref::<arrow_array::UInt8Array>()
260 .unwrap();
261 if ea.is_null(i) {
262 LiteralValue::Empty
263 } else {
264 LiteralValue::Error(ExcelError::new(
265 crate::arrow_store::unmap_error_code(
266 ea.value(i),
267 ),
268 ))
269 }
270 } else {
271 LiteralValue::Empty
272 }
273 }
274 crate::arrow_store::TypeTag::Pending => LiteralValue::Pending,
275 }
276 };
277 match val {
278 LiteralValue::Empty => {
279 tag_b.append_value(crate::arrow_store::TypeTag::Empty as u8);
280 nb.append_null();
281 bb.append_null();
282 sb.append_null();
283 eb.append_null();
284 }
285 LiteralValue::Int(i) => {
286 tag_b.append_value(crate::arrow_store::TypeTag::Number as u8);
287 nb.append_value(i as f64);
288 non_num += 1;
289 bb.append_null();
290 sb.append_null();
291 eb.append_null();
292 }
293 LiteralValue::Number(n) => {
294 tag_b.append_value(crate::arrow_store::TypeTag::Number as u8);
295 nb.append_value(n);
296 non_num += 1;
297 bb.append_null();
298 sb.append_null();
299 eb.append_null();
300 }
301 LiteralValue::Boolean(b) => {
302 tag_b.append_value(crate::arrow_store::TypeTag::Boolean as u8);
303 nb.append_null();
304 bb.append_value(b);
305 non_bool += 1;
306 sb.append_null();
307 eb.append_null();
308 }
309 LiteralValue::Text(s) => {
310 tag_b.append_value(crate::arrow_store::TypeTag::Text as u8);
311 nb.append_null();
312 bb.append_null();
313 sb.append_value(&s);
314 non_text += 1;
315 eb.append_null();
316 }
317 LiteralValue::Error(e) => {
318 tag_b.append_value(crate::arrow_store::TypeTag::Error as u8);
319 nb.append_null();
320 bb.append_null();
321 sb.append_null();
322 eb.append_value(crate::arrow_store::map_error_code(e.kind));
323 non_err += 1;
324 }
325 LiteralValue::Date(d) => {
326 tag_b.append_value(crate::arrow_store::TypeTag::Number as u8);
327 let dt = d.and_hms_opt(0, 0, 0).unwrap();
328 let serial = crate::builtins::datetime::datetime_to_serial_for(
329 date_system,
330 &dt,
331 );
332 nb.append_value(serial);
333 non_num += 1;
334 bb.append_null();
335 sb.append_null();
336 eb.append_null();
337 }
338 LiteralValue::DateTime(dt) => {
339 tag_b.append_value(crate::arrow_store::TypeTag::Number as u8);
340 let serial = crate::builtins::datetime::datetime_to_serial_for(
341 date_system,
342 &dt,
343 );
344 nb.append_value(serial);
345 non_num += 1;
346 bb.append_null();
347 sb.append_null();
348 eb.append_null();
349 }
350 LiteralValue::Time(t) => {
351 tag_b.append_value(crate::arrow_store::TypeTag::Number as u8);
352 let serial = t.num_seconds_from_midnight() as f64 / 86_400.0;
353 nb.append_value(serial);
354 non_num += 1;
355 bb.append_null();
356 sb.append_null();
357 eb.append_null();
358 }
359 LiteralValue::Duration(d) => {
360 tag_b.append_value(crate::arrow_store::TypeTag::Number as u8);
361 let serial = d.num_seconds() as f64 / 86_400.0;
362 nb.append_value(serial);
363 non_num += 1;
364 bb.append_null();
365 sb.append_null();
366 eb.append_null();
367 }
368 LiteralValue::Pending => {
369 tag_b.append_value(crate::arrow_store::TypeTag::Pending as u8);
370 nb.append_null();
371 bb.append_null();
372 sb.append_null();
373 eb.append_null();
374 }
375 LiteralValue::Array(_) => {
376 tag_b.append_value(crate::arrow_store::TypeTag::Error as u8);
377 nb.append_null();
378 bb.append_null();
379 sb.append_null();
380 eb.append_value(crate::arrow_store::map_error_code(
381 formualizer_common::ExcelErrorKind::Value,
382 ));
383 non_err += 1;
384 }
385 }
386 }
387 ch.type_tag = Arc::new(tag_b.finish());
388 ch.numbers = if non_num == 0 {
389 None
390 } else {
391 Some(Arc::new(nb.finish()))
392 };
393 ch.booleans = if non_bool == 0 {
394 None
395 } else {
396 Some(Arc::new(bb.finish()))
397 };
398 ch.text = if non_text == 0 {
399 None
400 } else {
401 Some(Arc::new(sb.finish()))
402 };
403 ch.errors = if non_err == 0 {
404 None
405 } else {
406 Some(Arc::new(eb.finish()))
407 };
408 ch.meta.len = len;
409 ch.meta.non_null_num = non_num;
410 ch.meta.non_null_bool = non_bool;
411 ch.meta.non_null_text = non_text;
412 ch.meta.non_null_err = non_err;
413 ch.overlay.clear();
414 }
415 }
416 }
417 }
418 self.engine.mark_data_edited();
420 Ok(total)
421 }
422}
423
424#[cfg(test)]
425mod tests {
426 use super::*;
427 use crate::engine::EvalConfig;
428 use crate::test_workbook::TestWorkbook;
429
430 #[test]
431 fn arrow_bulk_ingest_basic() {
432 let mut engine = Engine::new(TestWorkbook::default(), EvalConfig::default());
433 let mut ab = engine.begin_bulk_ingest_arrow();
434 ab.add_sheet("S", 3, 2);
435 ab.append_row(
436 "S",
437 &[
438 LiteralValue::Number(1.0),
439 LiteralValue::Text("a".into()),
440 LiteralValue::Empty,
441 ],
442 )
443 .unwrap();
444 ab.append_row(
445 "S",
446 &[
447 LiteralValue::Boolean(true),
448 LiteralValue::Text("".into()),
449 LiteralValue::Error(formualizer_common::ExcelError::new_value()),
450 ],
451 )
452 .unwrap();
453 let summary = ab.finish().unwrap();
454 assert_eq!(summary.sheets, 1);
455 assert_eq!(summary.total_rows, 2);
456
457 let sheet = engine
458 .sheet_store()
459 .sheet("S")
460 .expect("arrow sheet present");
461 assert_eq!(sheet.columns.len(), 3);
462 assert_eq!(sheet.nrows, 2);
463 for col in &sheet.columns {
465 assert_eq!(col.chunks.len(), 1);
466 assert_eq!(col.chunks[0].len(), 2);
467 }
468 }
469}