1use crate::lancedb::LanceDbStore;
5use crate::storage::arrow_convert::build_timestamp_column;
6use crate::storage::property_builder::PropertyColumnBuilder;
7use crate::storage::value_codec::CrdtDecodeMode;
8use anyhow::{Result, anyhow};
9use arrow_array::types::TimestampNanosecondType;
10use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt8Array, UInt64Array};
11use arrow_schema::{Field, Schema as ArrowSchema, TimeUnit};
12use futures::TryStreamExt;
13use lance::dataset::Dataset;
14use lancedb::Table;
15use lancedb::index::Index as LanceDbIndex;
16use lancedb::index::scalar::BTreeIndexBuilder;
17use std::collections::HashMap;
18use std::sync::Arc;
19use tracing::info;
20use uni_common::DataType;
21use uni_common::Properties;
22use uni_common::core::id::{Eid, Vid};
23use uni_common::core::schema::Schema;
24
25pub const DEFAULT_MAX_COMPACTION_ROWS: usize = 5_000_000;
28
29pub const ENTRY_SIZE_ESTIMATE: usize = 145;
33
34pub fn check_oom_guard(
39 row_count: usize,
40 max_rows: usize,
41 entity_name: &str,
42 qualifier: &str,
43) -> Result<()> {
44 if row_count > max_rows {
45 let estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE;
46 return Err(anyhow!(
47 "Table for {}_{} has {} rows (estimated {:.2} GB in memory), exceeding max_compaction_rows limit of {}. \
48 Use chunked compaction or increase the limit. See issue #143.",
49 entity_name,
50 qualifier,
51 row_count,
52 estimated_bytes as f64 / (1024.0 * 1024.0 * 1024.0),
53 max_rows
54 ));
55 }
56 Ok(())
57}
58
59#[derive(Clone, Debug, PartialEq)]
60pub enum Op {
61 Insert = 0,
62 Delete = 1,
63}
64
65#[derive(Clone, Debug)]
66pub struct L1Entry {
67 pub src_vid: Vid,
68 pub dst_vid: Vid,
69 pub eid: Eid,
70 pub op: Op,
71 pub version: u64,
72 pub properties: Properties,
73 pub created_at: Option<i64>,
75 pub updated_at: Option<i64>,
77}
78
79pub struct DeltaDataset {
80 uri: String,
81 edge_type: String,
82 direction: String, }
84
85impl DeltaDataset {
86 pub fn new(base_uri: &str, edge_type: &str, direction: &str) -> Self {
87 let uri = format!("{}/deltas/{}_{}", base_uri, edge_type, direction);
88 Self {
89 uri,
90 edge_type: edge_type.to_string(),
91 direction: direction.to_string(),
92 }
93 }
94
95 pub async fn open(&self) -> Result<Arc<Dataset>> {
96 self.open_at(None).await
97 }
98
99 pub async fn open_at(&self, version: Option<u64>) -> Result<Arc<Dataset>> {
100 let mut ds = Dataset::open(&self.uri).await?;
101 if let Some(v) = version {
102 ds = ds.checkout_version(v).await?;
103 }
104 Ok(Arc::new(ds))
105 }
106
107 pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
108 let mut fields = vec![
109 Field::new("src_vid", arrow_schema::DataType::UInt64, false),
110 Field::new("dst_vid", arrow_schema::DataType::UInt64, false),
111 Field::new("eid", arrow_schema::DataType::UInt64, false),
112 Field::new("op", arrow_schema::DataType::UInt8, false), Field::new("_version", arrow_schema::DataType::UInt64, false),
114 Field::new(
116 "_created_at",
117 arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
118 true,
119 ),
120 Field::new(
121 "_updated_at",
122 arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
123 true,
124 ),
125 ];
126
127 if let Some(type_props) = schema.properties.get(&self.edge_type) {
128 let mut sorted_props: Vec<_> = type_props.iter().collect();
129 sorted_props.sort_by_key(|(name, _)| *name);
130
131 for (name, meta) in sorted_props {
132 fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
133 }
134 }
135
136 fields.push(Field::new(
138 "overflow_json",
139 arrow_schema::DataType::LargeBinary,
140 true,
141 ));
142
143 Ok(Arc::new(ArrowSchema::new(fields)))
144 }
145
146 pub fn build_record_batch(&self, entries: &[L1Entry], schema: &Schema) -> Result<RecordBatch> {
147 let arrow_schema = self.get_arrow_schema(schema)?;
148
149 let mut src_vids = Vec::with_capacity(entries.len());
150 let mut dst_vids = Vec::with_capacity(entries.len());
151 let mut eids = Vec::with_capacity(entries.len());
152 let mut ops = Vec::with_capacity(entries.len());
153 let mut versions = Vec::with_capacity(entries.len());
154
155 for entry in entries {
156 src_vids.push(entry.src_vid.as_u64());
157 dst_vids.push(entry.dst_vid.as_u64());
158 eids.push(entry.eid.as_u64());
159 ops.push(entry.op.clone() as u8);
160 versions.push(entry.version);
161 }
162
163 let mut columns: Vec<ArrayRef> = vec![
164 Arc::new(UInt64Array::from(src_vids)),
165 Arc::new(UInt64Array::from(dst_vids)),
166 Arc::new(UInt64Array::from(eids)),
167 Arc::new(UInt8Array::from(ops)),
168 Arc::new(UInt64Array::from(versions)),
169 ];
170
171 columns.push(build_timestamp_column(entries.iter().map(|e| e.created_at)));
173 columns.push(build_timestamp_column(entries.iter().map(|e| e.updated_at)));
174
175 let deleted_flags: Vec<bool> = entries.iter().map(|e| e.op == Op::Delete).collect();
178
179 let prop_columns = PropertyColumnBuilder::new(schema, &self.edge_type, entries.len())
181 .with_deleted(&deleted_flags)
182 .build(|i| &entries[i].properties)?;
183
184 columns.extend(prop_columns);
185
186 let overflow_column = self.build_overflow_json_column(entries, schema)?;
188 columns.push(overflow_column);
189
190 RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
191 }
192
193 fn build_overflow_json_column(&self, entries: &[L1Entry], schema: &Schema) -> Result<ArrayRef> {
195 crate::storage::property_builder::build_overflow_json_column(
196 entries.len(),
197 &self.edge_type,
198 schema,
199 |i| &entries[i].properties,
200 &[],
201 )
202 }
203
204 pub async fn scan_all(&self, schema: &Schema) -> Result<Vec<L1Entry>> {
205 self.scan_all_with_limit(schema, DEFAULT_MAX_COMPACTION_ROWS)
206 .await
207 }
208
209 pub async fn scan_all_with_limit(
211 &self,
212 schema: &Schema,
213 max_rows: usize,
214 ) -> Result<Vec<L1Entry>> {
215 let ds = match self.open().await {
216 Ok(ds) => ds,
217 Err(_) => return Ok(vec![]),
218 };
219
220 let row_count = ds.count_rows(None).await?;
221 check_oom_guard(row_count, max_rows, &self.edge_type, &self.direction)?;
222
223 info!(
224 edge_type = %self.edge_type,
225 direction = %self.direction,
226 row_count,
227 estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE,
228 "Starting delta scan for compaction"
229 );
230
231 let mut stream = ds.scan().try_into_stream().await?;
232
233 let mut entries = Vec::new();
234
235 while let Some(batch) = stream.try_next().await? {
236 let mut batch_entries = self.parse_batch(&batch, schema)?;
237 entries.append(&mut batch_entries);
238 }
239
240 self.sort_entries(&mut entries);
241
242 Ok(entries)
243 }
244
245 fn sort_entries(&self, entries: &mut [L1Entry]) {
247 let is_fwd = self.direction == "fwd";
248 entries.sort_by(|a, b| {
249 let key_a = if is_fwd { a.src_vid } else { a.dst_vid };
250 let key_b = if is_fwd { b.src_vid } else { b.dst_vid };
251 key_a.cmp(&key_b).then(a.version.cmp(&b.version))
252 });
253 }
254
255 fn parse_batch(&self, batch: &RecordBatch, schema: &Schema) -> Result<Vec<L1Entry>> {
256 let src_vids = batch
257 .column_by_name("src_vid")
258 .ok_or(anyhow!("Missing src_vid"))?
259 .as_any()
260 .downcast_ref::<UInt64Array>()
261 .ok_or(anyhow!("Invalid src_vid type"))?;
262 let dst_vids = batch
263 .column_by_name("dst_vid")
264 .ok_or(anyhow!("Missing dst_vid"))?
265 .as_any()
266 .downcast_ref::<UInt64Array>()
267 .ok_or(anyhow!("Invalid dst_vid type"))?;
268 let eids = batch
269 .column_by_name("eid")
270 .ok_or(anyhow!("Missing eid"))?
271 .as_any()
272 .downcast_ref::<UInt64Array>()
273 .ok_or(anyhow!("Invalid eid type"))?;
274 let ops = batch
275 .column_by_name("op")
276 .ok_or(anyhow!("Missing op"))?
277 .as_any()
278 .downcast_ref::<UInt8Array>()
279 .ok_or(anyhow!("Invalid op type"))?;
280 let versions = batch
281 .column_by_name("_version")
282 .ok_or(anyhow!("Missing _version"))?
283 .as_any()
284 .downcast_ref::<UInt64Array>()
285 .ok_or(anyhow!("Invalid _version type"))?;
286
287 let created_at_col = batch.column_by_name("_created_at").and_then(|c| {
289 c.as_any()
290 .downcast_ref::<PrimitiveArray<TimestampNanosecondType>>()
291 });
292 let updated_at_col = batch.column_by_name("_updated_at").and_then(|c| {
293 c.as_any()
294 .downcast_ref::<PrimitiveArray<TimestampNanosecondType>>()
295 });
296
297 let mut prop_cols = Vec::new();
299 if let Some(type_props) = schema.properties.get(&self.edge_type) {
300 for (name, meta) in type_props {
301 if let Some(col) = batch.column_by_name(name) {
302 prop_cols.push((name, meta.r#type.clone(), col));
303 }
304 }
305 }
306
307 let mut entries = Vec::with_capacity(batch.num_rows());
308
309 for i in 0..batch.num_rows() {
310 let op = match ops.value(i) {
311 0 => Op::Insert,
312 1 => Op::Delete,
313 _ => continue, };
315
316 let properties = self.extract_properties(&prop_cols, i)?;
317
318 let read_ts = |col: Option<&PrimitiveArray<TimestampNanosecondType>>| {
320 col.and_then(|c| (!c.is_null(i)).then(|| c.value(i)))
321 };
322 let created_at = read_ts(created_at_col);
323 let updated_at = read_ts(updated_at_col);
324
325 entries.push(L1Entry {
326 src_vid: Vid::from(src_vids.value(i)),
327 dst_vid: Vid::from(dst_vids.value(i)),
328 eid: Eid::from(eids.value(i)),
329 op,
330 version: versions.value(i),
331 properties,
332 created_at,
333 updated_at,
334 });
335 }
336 Ok(entries)
337 }
338
339 fn extract_properties(
341 &self,
342 prop_cols: &[(&String, DataType, &ArrayRef)],
343 row: usize,
344 ) -> Result<Properties> {
345 let mut properties = Properties::new();
346 for (name, dtype, col) in prop_cols {
347 if col.is_null(row) {
348 continue;
349 }
350 let val = Self::value_from_column(col.as_ref(), dtype, row)?;
351 properties.insert(name.to_string(), uni_common::Value::from(val));
352 }
353 Ok(properties)
354 }
355
356 fn value_from_column(
358 col: &dyn arrow_array::Array,
359 dtype: &uni_common::DataType,
360 row: usize,
361 ) -> Result<serde_json::Value> {
362 crate::storage::value_codec::value_from_column(col, dtype, row, CrdtDecodeMode::Lenient)
363 }
364
365 fn filter_column(&self) -> &'static str {
367 if self.direction == "fwd" {
368 "src_vid"
369 } else {
370 "dst_vid"
371 }
372 }
373
374 pub async fn open_lancedb(&self, store: &LanceDbStore) -> Result<Table> {
380 store
381 .open_delta_table(&self.edge_type, &self.direction)
382 .await
383 }
384
385 pub async fn open_or_create_lancedb(
387 &self,
388 store: &LanceDbStore,
389 schema: &Schema,
390 ) -> Result<Table> {
391 let arrow_schema = self.get_arrow_schema(schema)?;
392 store
393 .open_or_create_delta_table(&self.edge_type, &self.direction, arrow_schema)
394 .await
395 }
396
397 pub async fn write_run_lancedb(
401 &self,
402 store: &LanceDbStore,
403 batch: RecordBatch,
404 ) -> Result<Table> {
405 let table_name = LanceDbStore::delta_table_name(&self.edge_type, &self.direction);
406
407 if store.table_exists(&table_name).await? {
408 let table = store.open_table(&table_name).await?;
409 store.append_to_table(&table, vec![batch]).await?;
410 Ok(table)
411 } else {
412 store.create_table(&table_name, vec![batch]).await
413 }
414 }
415
416 pub async fn ensure_eid_index_lancedb(&self, table: &Table) -> Result<()> {
418 let indices = table
419 .list_indices()
420 .await
421 .map_err(|e| anyhow!("Failed to list indices: {}", e))?;
422
423 if !indices
424 .iter()
425 .any(|idx| idx.columns.contains(&"eid".to_string()))
426 {
427 log::info!(
428 "Creating eid BTree index for edge type '{}' via LanceDB",
429 self.edge_type
430 );
431 if let Err(e) = table
432 .create_index(&["eid"], LanceDbIndex::BTree(BTreeIndexBuilder::default()))
433 .execute()
434 .await
435 {
436 log::warn!(
437 "Failed to create eid index for '{}' via LanceDB: {}",
438 self.edge_type,
439 e
440 );
441 }
442 }
443
444 Ok(())
445 }
446
447 pub fn lancedb_table_name(&self) -> String {
449 LanceDbStore::delta_table_name(&self.edge_type, &self.direction)
450 }
451
452 pub async fn scan_all_lancedb(
456 &self,
457 store: &LanceDbStore,
458 schema: &Schema,
459 ) -> Result<Vec<L1Entry>> {
460 self.scan_all_lancedb_with_limit(store, schema, DEFAULT_MAX_COMPACTION_ROWS)
461 .await
462 }
463
464 pub async fn scan_all_lancedb_with_limit(
466 &self,
467 store: &LanceDbStore,
468 schema: &Schema,
469 max_rows: usize,
470 ) -> Result<Vec<L1Entry>> {
471 let table = match self.open_lancedb(store).await {
472 Ok(t) => t,
473 Err(_) => return Ok(vec![]),
474 };
475
476 let row_count = table.count_rows(None).await?;
477 check_oom_guard(row_count, max_rows, &self.edge_type, &self.direction)?;
478
479 info!(
480 edge_type = %self.edge_type,
481 direction = %self.direction,
482 row_count,
483 estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE,
484 "Starting delta scan for compaction (LanceDB)"
485 );
486
487 use lancedb::query::ExecutableQuery;
488 let stream = table.query().execute().await?;
489 let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
490
491 let mut entries = Vec::new();
492 for batch in batches {
493 let mut batch_entries = self.parse_batch(&batch, schema)?;
494 entries.append(&mut batch_entries);
495 }
496
497 self.sort_entries(&mut entries);
498
499 Ok(entries)
500 }
501
502 pub async fn replace_lancedb(&self, store: &LanceDbStore, batch: RecordBatch) -> Result<Table> {
506 let table_name = self.lancedb_table_name();
507 let arrow_schema = batch.schema();
508 store
509 .replace_table_atomic(&table_name, vec![batch], arrow_schema)
510 .await
511 }
512
513 pub async fn read_deltas_lancedb(
517 &self,
518 store: &LanceDbStore,
519 vid: Vid,
520 schema: &Schema,
521 version_hwm: Option<u64>,
522 ) -> Result<Vec<L1Entry>> {
523 let table = match self.open_lancedb(store).await {
524 Ok(t) => t,
525 Err(_) => return Ok(vec![]),
526 };
527
528 use lancedb::query::{ExecutableQuery, QueryBase};
529
530 let base_filter = format!("{} = {}", self.filter_column(), vid.as_u64());
531
532 let final_filter = if let Some(hwm) = version_hwm {
534 format!("({}) AND (_version <= {})", base_filter, hwm)
535 } else {
536 base_filter
537 };
538
539 let query = table.query().only_if(final_filter);
540 let stream = query.execute().await?;
541 let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
542
543 let mut entries = Vec::new();
544 for batch in batches {
545 let mut batch_entries = self.parse_batch(&batch, schema)?;
546 entries.append(&mut batch_entries);
547 }
548
549 Ok(entries)
550 }
551
552 pub async fn read_deltas_lancedb_batch(
557 &self,
558 store: &LanceDbStore,
559 vids: &[Vid],
560 schema: &Schema,
561 version_hwm: Option<u64>,
562 ) -> Result<HashMap<Vid, Vec<L1Entry>>> {
563 if vids.is_empty() {
564 return Ok(HashMap::new());
565 }
566
567 let table = match self.open_lancedb(store).await {
568 Ok(t) => t,
569 Err(_) => return Ok(HashMap::new()),
570 };
571
572 use lancedb::query::{ExecutableQuery, QueryBase};
573
574 let vid_list = vids
576 .iter()
577 .map(|v| v.as_u64().to_string())
578 .collect::<Vec<_>>()
579 .join(", ");
580 let mut filter = format!("{} IN ({})", self.filter_column(), vid_list);
581
582 if let Some(hwm) = version_hwm {
584 filter = format!("({}) AND (_version <= {})", filter, hwm);
585 }
586
587 let query = table.query().only_if(filter);
588 let stream = query.execute().await?;
589 let batches: Vec<arrow_array::RecordBatch> = stream.try_collect().await?;
590
591 let is_fwd = self.direction == "fwd";
593 let mut result: HashMap<Vid, Vec<L1Entry>> = HashMap::new();
594 for batch in batches {
595 let entries = self.parse_batch(&batch, schema)?;
596 for entry in entries {
597 let vid = if is_fwd { entry.src_vid } else { entry.dst_vid };
598 result.entry(vid).or_default().push(entry);
599 }
600 }
601
602 Ok(result)
603 }
604}
605
606#[cfg(test)]
607mod tests {
608 use super::*;
609
610 #[test]
611 #[allow(clippy::assertions_on_constants)] fn test_constants_are_reasonable() {
613 assert_eq!(DEFAULT_MAX_COMPACTION_ROWS, 5_000_000);
615
616 assert!(ENTRY_SIZE_ESTIMATE >= 100, "Entry size estimate too low");
618 assert!(ENTRY_SIZE_ESTIMATE <= 300, "Entry size estimate too high");
619
620 let estimated_gb =
622 (DEFAULT_MAX_COMPACTION_ROWS * ENTRY_SIZE_ESTIMATE) as f64 / (1024.0 * 1024.0 * 1024.0);
623 assert!(
624 estimated_gb < 1.0,
625 "5M entries should fit in under 1GB with current estimate"
626 );
627 }
628
629 #[test]
630 fn test_memory_estimate_formatting() {
631 let row_count = 10_000_000;
633 let estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE;
634 let estimated_gb = estimated_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
635
636 assert!(
638 estimated_gb > 1.0 && estimated_gb < 2.0,
639 "10M rows should be 1-2 GB"
640 );
641 }
642}