1use crate::lancedb::LanceDbStore;
7use crate::storage::arrow_convert::build_timestamp_column;
8use crate::storage::property_builder::PropertyColumnBuilder;
9use crate::storage::value_codec::CrdtDecodeMode;
10use anyhow::{Result, anyhow};
11use arrow_array::types::TimestampNanosecondType;
12use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt8Array, UInt64Array};
13use arrow_schema::{Field, Schema as ArrowSchema, TimeUnit};
14use futures::TryStreamExt;
15use lance::dataset::Dataset;
16use lancedb::Table;
17use lancedb::index::Index as LanceDbIndex;
18use lancedb::index::scalar::BTreeIndexBuilder;
19use std::collections::HashMap;
20use std::sync::Arc;
21use tracing::info;
22use uni_common::DataType;
23use uni_common::Properties;
24use uni_common::core::id::{Eid, Vid};
25use uni_common::core::schema::Schema;
26
27pub const DEFAULT_MAX_COMPACTION_ROWS: usize = 5_000_000;
30
31pub const ENTRY_SIZE_ESTIMATE: usize = 145;
35
36pub fn check_oom_guard(
41 row_count: usize,
42 max_rows: usize,
43 entity_name: &str,
44 qualifier: &str,
45) -> Result<()> {
46 if row_count > max_rows {
47 let estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE;
48 return Err(anyhow!(
49 "Table for {}_{} has {} rows (estimated {:.2} GB in memory), exceeding max_compaction_rows limit of {}. \
50 Use chunked compaction or increase the limit. See issue #143.",
51 entity_name,
52 qualifier,
53 row_count,
54 estimated_bytes as f64 / (1024.0 * 1024.0 * 1024.0),
55 max_rows
56 ));
57 }
58 Ok(())
59}
60
61#[derive(Clone, Copy, Debug, PartialEq, Eq)]
63pub enum Op {
64 Insert = 0,
66 Delete = 1,
68}
69
70#[derive(Clone, Debug)]
72pub struct L1Entry {
73 pub src_vid: Vid,
74 pub dst_vid: Vid,
75 pub eid: Eid,
76 pub op: Op,
77 pub version: u64,
78 pub properties: Properties,
79 pub created_at: Option<i64>,
81 pub updated_at: Option<i64>,
83}
84
85#[derive(Debug)]
90pub struct DeltaDataset {
91 uri: String,
92 edge_type: String,
93 direction: String, }
95
96impl DeltaDataset {
97 pub fn new(base_uri: &str, edge_type: &str, direction: &str) -> Self {
99 let uri = format!("{}/deltas/{}_{}", base_uri, edge_type, direction);
100 Self {
101 uri,
102 edge_type: edge_type.to_string(),
103 direction: direction.to_string(),
104 }
105 }
106
107 pub async fn open(&self) -> Result<Arc<Dataset>> {
109 self.open_at(None).await
110 }
111
112 pub async fn open_at(&self, version: Option<u64>) -> Result<Arc<Dataset>> {
114 let mut ds = Dataset::open(&self.uri).await?;
115 if let Some(v) = version {
116 ds = ds.checkout_version(v).await?;
117 }
118 Ok(Arc::new(ds))
119 }
120
121 pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
123 let mut fields = vec![
124 Field::new("src_vid", arrow_schema::DataType::UInt64, false),
125 Field::new("dst_vid", arrow_schema::DataType::UInt64, false),
126 Field::new("eid", arrow_schema::DataType::UInt64, false),
127 Field::new("op", arrow_schema::DataType::UInt8, false), Field::new("_version", arrow_schema::DataType::UInt64, false),
129 Field::new(
131 "_created_at",
132 arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
133 true,
134 ),
135 Field::new(
136 "_updated_at",
137 arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
138 true,
139 ),
140 ];
141
142 if let Some(type_props) = schema.properties.get(&self.edge_type) {
143 let mut sorted_props: Vec<_> = type_props.iter().collect();
144 sorted_props.sort_by_key(|(name, _)| *name);
145
146 for (name, meta) in sorted_props {
147 fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
148 }
149 }
150
151 fields.push(Field::new(
153 "overflow_json",
154 arrow_schema::DataType::LargeBinary,
155 true,
156 ));
157
158 Ok(Arc::new(ArrowSchema::new(fields)))
159 }
160
161 pub fn build_record_batch(&self, entries: &[L1Entry], schema: &Schema) -> Result<RecordBatch> {
163 let arrow_schema = self.get_arrow_schema(schema)?;
164
165 let mut src_vids = Vec::with_capacity(entries.len());
166 let mut dst_vids = Vec::with_capacity(entries.len());
167 let mut eids = Vec::with_capacity(entries.len());
168 let mut ops = Vec::with_capacity(entries.len());
169 let mut versions = Vec::with_capacity(entries.len());
170
171 for entry in entries {
172 src_vids.push(entry.src_vid.as_u64());
173 dst_vids.push(entry.dst_vid.as_u64());
174 eids.push(entry.eid.as_u64());
175 ops.push(entry.op as u8);
176 versions.push(entry.version);
177 }
178
179 let mut columns: Vec<ArrayRef> = vec![
180 Arc::new(UInt64Array::from(src_vids)),
181 Arc::new(UInt64Array::from(dst_vids)),
182 Arc::new(UInt64Array::from(eids)),
183 Arc::new(UInt8Array::from(ops)),
184 Arc::new(UInt64Array::from(versions)),
185 ];
186
187 columns.push(build_timestamp_column(entries.iter().map(|e| e.created_at)));
189 columns.push(build_timestamp_column(entries.iter().map(|e| e.updated_at)));
190
191 let deleted_flags: Vec<bool> = entries.iter().map(|e| e.op == Op::Delete).collect();
194
195 let prop_columns = PropertyColumnBuilder::new(schema, &self.edge_type, entries.len())
197 .with_deleted(&deleted_flags)
198 .build(|i| &entries[i].properties)?;
199
200 columns.extend(prop_columns);
201
202 let overflow_column = self.build_overflow_json_column(entries, schema)?;
204 columns.push(overflow_column);
205
206 RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
207 }
208
209 fn build_overflow_json_column(&self, entries: &[L1Entry], schema: &Schema) -> Result<ArrayRef> {
211 crate::storage::property_builder::build_overflow_json_column(
212 entries.len(),
213 &self.edge_type,
214 schema,
215 |i| &entries[i].properties,
216 &[],
217 )
218 }
219
220 pub async fn scan_all(&self, schema: &Schema) -> Result<Vec<L1Entry>> {
222 self.scan_all_with_limit(schema, DEFAULT_MAX_COMPACTION_ROWS)
223 .await
224 }
225
226 pub async fn scan_all_with_limit(
228 &self,
229 schema: &Schema,
230 max_rows: usize,
231 ) -> Result<Vec<L1Entry>> {
232 let ds = match self.open().await {
233 Ok(ds) => ds,
234 Err(_) => return Ok(vec![]),
235 };
236
237 let row_count = ds.count_rows(None).await?;
238 check_oom_guard(row_count, max_rows, &self.edge_type, &self.direction)?;
239
240 info!(
241 edge_type = %self.edge_type,
242 direction = %self.direction,
243 row_count,
244 estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE,
245 "Starting delta scan for compaction"
246 );
247
248 let mut stream = ds.scan().try_into_stream().await?;
249
250 let mut entries = Vec::new();
251
252 while let Some(batch) = stream.try_next().await? {
253 let mut batch_entries = self.parse_batch(&batch, schema)?;
254 entries.append(&mut batch_entries);
255 }
256
257 self.sort_entries(&mut entries);
258
259 Ok(entries)
260 }
261
262 fn sort_entries(&self, entries: &mut [L1Entry]) {
264 let is_fwd = self.direction == "fwd";
265 entries.sort_by(|a, b| {
266 let key_a = if is_fwd { a.src_vid } else { a.dst_vid };
267 let key_b = if is_fwd { b.src_vid } else { b.dst_vid };
268 key_a.cmp(&key_b).then(a.version.cmp(&b.version))
269 });
270 }
271
272 fn parse_batch(&self, batch: &RecordBatch, schema: &Schema) -> Result<Vec<L1Entry>> {
273 let src_vids = batch
274 .column_by_name("src_vid")
275 .ok_or(anyhow!("Missing src_vid"))?
276 .as_any()
277 .downcast_ref::<UInt64Array>()
278 .ok_or(anyhow!("Invalid src_vid type"))?;
279 let dst_vids = batch
280 .column_by_name("dst_vid")
281 .ok_or(anyhow!("Missing dst_vid"))?
282 .as_any()
283 .downcast_ref::<UInt64Array>()
284 .ok_or(anyhow!("Invalid dst_vid type"))?;
285 let eids = batch
286 .column_by_name("eid")
287 .ok_or(anyhow!("Missing eid"))?
288 .as_any()
289 .downcast_ref::<UInt64Array>()
290 .ok_or(anyhow!("Invalid eid type"))?;
291 let ops = batch
292 .column_by_name("op")
293 .ok_or(anyhow!("Missing op"))?
294 .as_any()
295 .downcast_ref::<UInt8Array>()
296 .ok_or(anyhow!("Invalid op type"))?;
297 let versions = batch
298 .column_by_name("_version")
299 .ok_or(anyhow!("Missing _version"))?
300 .as_any()
301 .downcast_ref::<UInt64Array>()
302 .ok_or(anyhow!("Invalid _version type"))?;
303
304 let created_at_col = batch.column_by_name("_created_at").and_then(|c| {
306 c.as_any()
307 .downcast_ref::<PrimitiveArray<TimestampNanosecondType>>()
308 });
309 let updated_at_col = batch.column_by_name("_updated_at").and_then(|c| {
310 c.as_any()
311 .downcast_ref::<PrimitiveArray<TimestampNanosecondType>>()
312 });
313
314 let mut prop_cols = Vec::new();
316 if let Some(type_props) = schema.properties.get(&self.edge_type) {
317 for (name, meta) in type_props {
318 if let Some(col) = batch.column_by_name(name) {
319 prop_cols.push((name, meta.r#type.clone(), col));
320 }
321 }
322 }
323
324 let mut entries = Vec::with_capacity(batch.num_rows());
325
326 for i in 0..batch.num_rows() {
327 let op = match ops.value(i) {
328 0 => Op::Insert,
329 1 => Op::Delete,
330 _ => continue, };
332
333 let properties = self.extract_properties(&prop_cols, i)?;
334
335 let read_ts = |col: Option<&PrimitiveArray<TimestampNanosecondType>>| {
337 col.and_then(|c| (!c.is_null(i)).then(|| c.value(i)))
338 };
339 let created_at = read_ts(created_at_col);
340 let updated_at = read_ts(updated_at_col);
341
342 entries.push(L1Entry {
343 src_vid: Vid::from(src_vids.value(i)),
344 dst_vid: Vid::from(dst_vids.value(i)),
345 eid: Eid::from(eids.value(i)),
346 op,
347 version: versions.value(i),
348 properties,
349 created_at,
350 updated_at,
351 });
352 }
353 Ok(entries)
354 }
355
356 fn extract_properties(
358 &self,
359 prop_cols: &[(&String, DataType, &ArrayRef)],
360 row: usize,
361 ) -> Result<Properties> {
362 let mut properties = Properties::new();
363 for (name, dtype, col) in prop_cols {
364 if col.is_null(row) {
365 continue;
366 }
367 let val = Self::value_from_column(col.as_ref(), dtype, row)?;
368 properties.insert(name.to_string(), uni_common::Value::from(val));
369 }
370 Ok(properties)
371 }
372
373 fn value_from_column(
375 col: &dyn arrow_array::Array,
376 dtype: &uni_common::DataType,
377 row: usize,
378 ) -> Result<serde_json::Value> {
379 crate::storage::value_codec::value_from_column(col, dtype, row, CrdtDecodeMode::Lenient)
380 }
381
382 fn filter_column(&self) -> &'static str {
384 if self.direction == "fwd" {
385 "src_vid"
386 } else {
387 "dst_vid"
388 }
389 }
390
391 pub async fn open_lancedb(&self, store: &LanceDbStore) -> Result<Table> {
397 store
398 .open_delta_table(&self.edge_type, &self.direction)
399 .await
400 }
401
402 pub async fn open_or_create_lancedb(
404 &self,
405 store: &LanceDbStore,
406 schema: &Schema,
407 ) -> Result<Table> {
408 let arrow_schema = self.get_arrow_schema(schema)?;
409 store
410 .open_or_create_delta_table(&self.edge_type, &self.direction, arrow_schema)
411 .await
412 }
413
414 pub async fn write_run_lancedb(
418 &self,
419 store: &LanceDbStore,
420 batch: RecordBatch,
421 ) -> Result<Table> {
422 let table_name = LanceDbStore::delta_table_name(&self.edge_type, &self.direction);
423
424 if store.table_exists(&table_name).await? {
425 let table = store.open_table(&table_name).await?;
426 store.append_to_table(&table, vec![batch]).await?;
427 Ok(table)
428 } else {
429 store.create_table(&table_name, vec![batch]).await
430 }
431 }
432
433 pub async fn ensure_eid_index_lancedb(&self, table: &Table) -> Result<()> {
435 let indices = table
436 .list_indices()
437 .await
438 .map_err(|e| anyhow!("Failed to list indices: {}", e))?;
439
440 if !indices
441 .iter()
442 .any(|idx| idx.columns.contains(&"eid".to_string()))
443 {
444 log::info!(
445 "Creating eid BTree index for edge type '{}' via LanceDB",
446 self.edge_type
447 );
448 if let Err(e) = table
449 .create_index(&["eid"], LanceDbIndex::BTree(BTreeIndexBuilder::default()))
450 .execute()
451 .await
452 {
453 log::warn!(
454 "Failed to create eid index for '{}' via LanceDB: {}",
455 self.edge_type,
456 e
457 );
458 }
459 }
460
461 Ok(())
462 }
463
464 pub fn lancedb_table_name(&self) -> String {
466 LanceDbStore::delta_table_name(&self.edge_type, &self.direction)
467 }
468
469 pub async fn scan_all_lancedb(
473 &self,
474 store: &LanceDbStore,
475 schema: &Schema,
476 ) -> Result<Vec<L1Entry>> {
477 self.scan_all_lancedb_with_limit(store, schema, DEFAULT_MAX_COMPACTION_ROWS)
478 .await
479 }
480
481 pub async fn scan_all_lancedb_with_limit(
483 &self,
484 store: &LanceDbStore,
485 schema: &Schema,
486 max_rows: usize,
487 ) -> Result<Vec<L1Entry>> {
488 let table = match self.open_lancedb(store).await {
489 Ok(t) => t,
490 Err(_) => return Ok(vec![]),
491 };
492
493 let row_count = table.count_rows(None).await?;
494 check_oom_guard(row_count, max_rows, &self.edge_type, &self.direction)?;
495
496 info!(
497 edge_type = %self.edge_type,
498 direction = %self.direction,
499 row_count,
500 estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE,
501 "Starting delta scan for compaction (LanceDB)"
502 );
503
504 use lancedb::query::ExecutableQuery;
505 let stream = table.query().execute().await?;
506 let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
507
508 let mut entries = Vec::new();
509 for batch in batches {
510 let mut batch_entries = self.parse_batch(&batch, schema)?;
511 entries.append(&mut batch_entries);
512 }
513
514 self.sort_entries(&mut entries);
515
516 Ok(entries)
517 }
518
519 pub async fn replace_lancedb(&self, store: &LanceDbStore, batch: RecordBatch) -> Result<Table> {
523 let table_name = self.lancedb_table_name();
524 let arrow_schema = batch.schema();
525 store
526 .replace_table_atomic(&table_name, vec![batch], arrow_schema)
527 .await
528 }
529
530 pub async fn read_deltas_lancedb(
534 &self,
535 store: &LanceDbStore,
536 vid: Vid,
537 schema: &Schema,
538 version_hwm: Option<u64>,
539 ) -> Result<Vec<L1Entry>> {
540 let table = match self.open_lancedb(store).await {
541 Ok(t) => t,
542 Err(_) => return Ok(vec![]),
543 };
544
545 use lancedb::query::{ExecutableQuery, QueryBase};
546
547 let base_filter = format!("{} = {}", self.filter_column(), vid.as_u64());
548
549 let final_filter = if let Some(hwm) = version_hwm {
551 format!("({}) AND (_version <= {})", base_filter, hwm)
552 } else {
553 base_filter
554 };
555
556 let query = table.query().only_if(final_filter);
557 let stream = query.execute().await?;
558 let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
559
560 let mut entries = Vec::new();
561 for batch in batches {
562 let mut batch_entries = self.parse_batch(&batch, schema)?;
563 entries.append(&mut batch_entries);
564 }
565
566 Ok(entries)
567 }
568
569 pub async fn read_deltas_lancedb_batch(
574 &self,
575 store: &LanceDbStore,
576 vids: &[Vid],
577 schema: &Schema,
578 version_hwm: Option<u64>,
579 ) -> Result<HashMap<Vid, Vec<L1Entry>>> {
580 if vids.is_empty() {
581 return Ok(HashMap::new());
582 }
583
584 let table = match self.open_lancedb(store).await {
585 Ok(t) => t,
586 Err(_) => return Ok(HashMap::new()),
587 };
588
589 use lancedb::query::{ExecutableQuery, QueryBase};
590
591 let vid_list = vids
593 .iter()
594 .map(|v| v.as_u64().to_string())
595 .collect::<Vec<_>>()
596 .join(", ");
597 let mut filter = format!("{} IN ({})", self.filter_column(), vid_list);
598
599 if let Some(hwm) = version_hwm {
601 filter = format!("({}) AND (_version <= {})", filter, hwm);
602 }
603
604 let query = table.query().only_if(filter);
605 let stream = query.execute().await?;
606 let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
607
608 let is_fwd = self.direction == "fwd";
610 let mut result: HashMap<Vid, Vec<L1Entry>> = HashMap::new();
611 for batch in batches {
612 let entries = self.parse_batch(&batch, schema)?;
613 for entry in entries {
614 let vid = if is_fwd { entry.src_vid } else { entry.dst_vid };
615 result.entry(vid).or_default().push(entry);
616 }
617 }
618
619 Ok(result)
620 }
621}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626
627 #[test]
628 #[expect(
629 clippy::assertions_on_constants,
630 reason = "Validating configuration constants intentionally"
631 )]
632 fn test_constants_are_reasonable() {
633 assert_eq!(DEFAULT_MAX_COMPACTION_ROWS, 5_000_000);
635
636 assert!(ENTRY_SIZE_ESTIMATE >= 100, "Entry size estimate too low");
638 assert!(ENTRY_SIZE_ESTIMATE <= 300, "Entry size estimate too high");
639
640 let estimated_gb =
642 (DEFAULT_MAX_COMPACTION_ROWS * ENTRY_SIZE_ESTIMATE) as f64 / (1024.0 * 1024.0 * 1024.0);
643 assert!(
644 estimated_gb < 1.0,
645 "5M entries should fit in under 1GB with current estimate"
646 );
647 }
648
649 #[test]
650 fn test_memory_estimate_formatting() {
651 let row_count = 10_000_000;
653 let estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE;
654 let estimated_gb = estimated_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
655
656 assert!(
658 estimated_gb > 1.0 && estimated_gb < 2.0,
659 "10M rows should be 1-2 GB"
660 );
661 }
662}