1use crate::backend::StorageBackend;
7use crate::backend::table_names;
8use crate::backend::types::{ScalarIndexType, ScanRequest, WriteMode};
9use crate::storage::arrow_convert::build_timestamp_column;
10use crate::storage::property_builder::PropertyColumnBuilder;
11use crate::storage::value_codec::CrdtDecodeMode;
12use anyhow::{Result, anyhow};
13use arrow_array::types::TimestampNanosecondType;
14use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt8Array, UInt64Array};
15use arrow_schema::{Field, Schema as ArrowSchema, TimeUnit};
16#[cfg(feature = "lance-backend")]
17use futures::TryStreamExt;
18#[cfg(feature = "lance-backend")]
19use lance::dataset::Dataset;
20use std::collections::HashMap;
21use std::sync::Arc;
22use tracing::info;
23use uni_common::DataType;
24use uni_common::Properties;
25use uni_common::core::id::{Eid, Vid};
26use uni_common::core::schema::Schema;
27
28pub const DEFAULT_MAX_COMPACTION_ROWS: usize = 5_000_000;
31
32pub const ENTRY_SIZE_ESTIMATE: usize = 145;
36
37pub fn check_oom_guard(
42 row_count: usize,
43 max_rows: usize,
44 entity_name: &str,
45 qualifier: &str,
46) -> Result<()> {
47 if row_count > max_rows {
48 let estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE;
49 return Err(anyhow!(
50 "Table for {}_{} has {} rows (estimated {:.2} GB in memory), exceeding max_compaction_rows limit of {}. \
51 Use chunked compaction or increase the limit. See issue #143.",
52 entity_name,
53 qualifier,
54 row_count,
55 estimated_bytes as f64 / (1024.0 * 1024.0 * 1024.0),
56 max_rows
57 ));
58 }
59 Ok(())
60}
61
62#[derive(Clone, Copy, Debug, PartialEq, Eq)]
64pub enum Op {
65 Insert = 0,
67 Delete = 1,
69}
70
71#[derive(Clone, Debug)]
73pub struct L1Entry {
74 pub src_vid: Vid,
75 pub dst_vid: Vid,
76 pub eid: Eid,
77 pub op: Op,
78 pub version: u64,
79 pub properties: Properties,
80 pub created_at: Option<i64>,
82 pub updated_at: Option<i64>,
84}
85
86#[derive(Debug)]
91pub struct DeltaDataset {
92 #[cfg_attr(not(feature = "lance-backend"), allow(dead_code))]
93 uri: String,
94 edge_type: String,
95 direction: String, }
97
98impl DeltaDataset {
99 pub fn new(base_uri: &str, edge_type: &str, direction: &str) -> Self {
101 let uri = format!("{}/deltas/{}_{}", base_uri, edge_type, direction);
102 Self {
103 uri,
104 edge_type: edge_type.to_string(),
105 direction: direction.to_string(),
106 }
107 }
108
109 #[cfg(feature = "lance-backend")]
111 pub async fn open(&self) -> Result<Arc<Dataset>> {
112 self.open_at(None).await
113 }
114
115 #[cfg(feature = "lance-backend")]
117 pub async fn open_at(&self, version: Option<u64>) -> Result<Arc<Dataset>> {
118 let mut ds = Dataset::open(&self.uri).await?;
119 if let Some(v) = version {
120 ds = ds.checkout_version(v).await?;
121 }
122 Ok(Arc::new(ds))
123 }
124
125 pub fn get_arrow_schema(&self, schema: &Schema) -> Result<Arc<ArrowSchema>> {
127 let mut fields = vec![
128 Field::new("src_vid", arrow_schema::DataType::UInt64, false),
129 Field::new("dst_vid", arrow_schema::DataType::UInt64, false),
130 Field::new("eid", arrow_schema::DataType::UInt64, false),
131 Field::new("op", arrow_schema::DataType::UInt8, false), Field::new("_version", arrow_schema::DataType::UInt64, false),
133 Field::new(
135 "_created_at",
136 arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
137 true,
138 ),
139 Field::new(
140 "_updated_at",
141 arrow_schema::DataType::Timestamp(TimeUnit::Nanosecond, Some("UTC".into())),
142 true,
143 ),
144 ];
145
146 if let Some(type_props) = schema.properties.get(&self.edge_type) {
147 let mut sorted_props: Vec<_> = type_props.iter().collect();
148 sorted_props.sort_by_key(|(name, _)| *name);
149
150 for (name, meta) in sorted_props {
151 fields.push(Field::new(name, meta.r#type.to_arrow(), meta.nullable));
152 }
153 }
154
155 fields.push(Field::new(
157 "overflow_json",
158 arrow_schema::DataType::LargeBinary,
159 true,
160 ));
161
162 Ok(Arc::new(ArrowSchema::new(fields)))
163 }
164
165 pub fn build_record_batch(&self, entries: &[L1Entry], schema: &Schema) -> Result<RecordBatch> {
167 let arrow_schema = self.get_arrow_schema(schema)?;
168
169 let mut src_vids = Vec::with_capacity(entries.len());
170 let mut dst_vids = Vec::with_capacity(entries.len());
171 let mut eids = Vec::with_capacity(entries.len());
172 let mut ops = Vec::with_capacity(entries.len());
173 let mut versions = Vec::with_capacity(entries.len());
174
175 for entry in entries {
176 src_vids.push(entry.src_vid.as_u64());
177 dst_vids.push(entry.dst_vid.as_u64());
178 eids.push(entry.eid.as_u64());
179 ops.push(entry.op as u8);
180 versions.push(entry.version);
181 }
182
183 let mut columns: Vec<ArrayRef> = vec![
184 Arc::new(UInt64Array::from(src_vids)),
185 Arc::new(UInt64Array::from(dst_vids)),
186 Arc::new(UInt64Array::from(eids)),
187 Arc::new(UInt8Array::from(ops)),
188 Arc::new(UInt64Array::from(versions)),
189 ];
190
191 columns.push(build_timestamp_column(entries.iter().map(|e| e.created_at)));
193 columns.push(build_timestamp_column(entries.iter().map(|e| e.updated_at)));
194
195 let deleted_flags: Vec<bool> = entries.iter().map(|e| e.op == Op::Delete).collect();
198
199 let prop_columns = PropertyColumnBuilder::new(schema, &self.edge_type, entries.len())
201 .with_deleted(&deleted_flags)
202 .build(|i| &entries[i].properties)?;
203
204 columns.extend(prop_columns);
205
206 let overflow_column = self.build_overflow_json_column(entries, schema)?;
208 columns.push(overflow_column);
209
210 RecordBatch::try_new(arrow_schema, columns).map_err(|e| anyhow!(e))
211 }
212
213 fn build_overflow_json_column(&self, entries: &[L1Entry], schema: &Schema) -> Result<ArrayRef> {
215 crate::storage::property_builder::build_overflow_json_column(
216 entries.len(),
217 &self.edge_type,
218 schema,
219 |i| &entries[i].properties,
220 &[],
221 )
222 }
223
224 #[cfg(feature = "lance-backend")]
226 pub async fn scan_all(&self, schema: &Schema) -> Result<Vec<L1Entry>> {
227 self.scan_all_with_limit(schema, DEFAULT_MAX_COMPACTION_ROWS)
228 .await
229 }
230
231 #[cfg(feature = "lance-backend")]
233 pub async fn scan_all_with_limit(
234 &self,
235 schema: &Schema,
236 max_rows: usize,
237 ) -> Result<Vec<L1Entry>> {
238 let ds = match self.open().await {
239 Ok(ds) => ds,
240 Err(_) => return Ok(vec![]),
241 };
242
243 let row_count = ds.count_rows(None).await?;
244 check_oom_guard(row_count, max_rows, &self.edge_type, &self.direction)?;
245
246 info!(
247 edge_type = %self.edge_type,
248 direction = %self.direction,
249 row_count,
250 estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE,
251 "Starting delta scan for compaction"
252 );
253
254 let mut stream = ds.scan().try_into_stream().await?;
255
256 let mut entries = Vec::new();
257
258 while let Some(batch) = stream.try_next().await? {
259 let mut batch_entries = self.parse_batch(&batch, schema)?;
260 entries.append(&mut batch_entries);
261 }
262
263 self.sort_entries(&mut entries);
264
265 Ok(entries)
266 }
267
268 fn sort_entries(&self, entries: &mut [L1Entry]) {
270 let is_fwd = self.direction == "fwd";
271 entries.sort_by(|a, b| {
272 let key_a = if is_fwd { a.src_vid } else { a.dst_vid };
273 let key_b = if is_fwd { b.src_vid } else { b.dst_vid };
274 key_a.cmp(&key_b).then(a.version.cmp(&b.version))
275 });
276 }
277
278 fn parse_batch(&self, batch: &RecordBatch, schema: &Schema) -> Result<Vec<L1Entry>> {
279 let src_vids = batch
280 .column_by_name("src_vid")
281 .ok_or(anyhow!("Missing src_vid"))?
282 .as_any()
283 .downcast_ref::<UInt64Array>()
284 .ok_or(anyhow!("Invalid src_vid type"))?;
285 let dst_vids = batch
286 .column_by_name("dst_vid")
287 .ok_or(anyhow!("Missing dst_vid"))?
288 .as_any()
289 .downcast_ref::<UInt64Array>()
290 .ok_or(anyhow!("Invalid dst_vid type"))?;
291 let eids = batch
292 .column_by_name("eid")
293 .ok_or(anyhow!("Missing eid"))?
294 .as_any()
295 .downcast_ref::<UInt64Array>()
296 .ok_or(anyhow!("Invalid eid type"))?;
297 let ops = batch
298 .column_by_name("op")
299 .ok_or(anyhow!("Missing op"))?
300 .as_any()
301 .downcast_ref::<UInt8Array>()
302 .ok_or(anyhow!("Invalid op type"))?;
303 let versions = batch
304 .column_by_name("_version")
305 .ok_or(anyhow!("Missing _version"))?
306 .as_any()
307 .downcast_ref::<UInt64Array>()
308 .ok_or(anyhow!("Invalid _version type"))?;
309
310 let created_at_col = batch.column_by_name("_created_at").and_then(|c| {
312 c.as_any()
313 .downcast_ref::<PrimitiveArray<TimestampNanosecondType>>()
314 });
315 let updated_at_col = batch.column_by_name("_updated_at").and_then(|c| {
316 c.as_any()
317 .downcast_ref::<PrimitiveArray<TimestampNanosecondType>>()
318 });
319
320 let mut prop_cols = Vec::new();
322 if let Some(type_props) = schema.properties.get(&self.edge_type) {
323 for (name, meta) in type_props {
324 if let Some(col) = batch.column_by_name(name) {
325 prop_cols.push((name, meta.r#type.clone(), col));
326 }
327 }
328 }
329
330 let mut entries = Vec::with_capacity(batch.num_rows());
331
332 for i in 0..batch.num_rows() {
333 let op = match ops.value(i) {
334 0 => Op::Insert,
335 1 => Op::Delete,
336 _ => continue, };
338
339 let properties = self.extract_properties(&prop_cols, i)?;
340
341 let read_ts = |col: Option<&PrimitiveArray<TimestampNanosecondType>>| {
343 col.and_then(|c| (!c.is_null(i)).then(|| c.value(i)))
344 };
345 let created_at = read_ts(created_at_col);
346 let updated_at = read_ts(updated_at_col);
347
348 entries.push(L1Entry {
349 src_vid: Vid::from(src_vids.value(i)),
350 dst_vid: Vid::from(dst_vids.value(i)),
351 eid: Eid::from(eids.value(i)),
352 op,
353 version: versions.value(i),
354 properties,
355 created_at,
356 updated_at,
357 });
358 }
359 Ok(entries)
360 }
361
362 fn extract_properties(
364 &self,
365 prop_cols: &[(&String, DataType, &ArrayRef)],
366 row: usize,
367 ) -> Result<Properties> {
368 let mut properties = Properties::new();
369 for (name, dtype, col) in prop_cols {
370 if col.is_null(row) {
371 continue;
372 }
373 let val = Self::value_from_column(col.as_ref(), dtype, row)?;
374 properties.insert(name.to_string(), uni_common::Value::from(val));
375 }
376 Ok(properties)
377 }
378
379 fn value_from_column(
381 col: &dyn arrow_array::Array,
382 dtype: &uni_common::DataType,
383 row: usize,
384 ) -> Result<serde_json::Value> {
385 crate::storage::value_codec::value_from_column(col, dtype, row, CrdtDecodeMode::Lenient)
386 }
387
388 fn filter_column(&self) -> &'static str {
390 if self.direction == "fwd" {
391 "src_vid"
392 } else {
393 "dst_vid"
394 }
395 }
396
397 pub async fn open_or_create(
403 &self,
404 backend: &dyn StorageBackend,
405 schema: &Schema,
406 ) -> Result<()> {
407 let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
408 let arrow_schema = self.get_arrow_schema(schema)?;
409 backend
410 .open_or_create_table(&table_name, arrow_schema)
411 .await
412 }
413
414 pub async fn write_run(&self, backend: &dyn StorageBackend, batch: RecordBatch) -> Result<()> {
418 let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
419 if backend.table_exists(&table_name).await? {
420 backend
421 .write(&table_name, vec![batch], WriteMode::Append)
422 .await
423 } else {
424 backend.create_table(&table_name, vec![batch]).await
425 }
426 }
427
428 pub async fn ensure_eid_index(&self, backend: &dyn StorageBackend) -> Result<()> {
430 let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
431 let indices = backend.list_indexes(&table_name).await?;
432
433 if !indices
434 .iter()
435 .any(|idx| idx.columns.contains(&"eid".to_string()))
436 {
437 log::info!(
438 "Creating eid BTree index for edge type '{}'",
439 self.edge_type
440 );
441 if let Err(e) = backend
442 .create_scalar_index(&table_name, "eid", ScalarIndexType::BTree)
443 .await
444 {
445 log::warn!("Failed to create eid index for '{}': {}", self.edge_type, e);
446 }
447 }
448
449 Ok(())
450 }
451
452 pub fn table_name(&self) -> String {
454 table_names::delta_table_name(&self.edge_type, &self.direction)
455 }
456
457 pub async fn scan_all_backend(
461 &self,
462 backend: &dyn StorageBackend,
463 schema: &Schema,
464 ) -> Result<Vec<L1Entry>> {
465 self.scan_all_backend_with_limit(backend, schema, DEFAULT_MAX_COMPACTION_ROWS)
466 .await
467 }
468
469 pub async fn scan_all_backend_with_limit(
471 &self,
472 backend: &dyn StorageBackend,
473 schema: &Schema,
474 max_rows: usize,
475 ) -> Result<Vec<L1Entry>> {
476 let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
477
478 if !backend.table_exists(&table_name).await? {
479 return Ok(vec![]);
480 }
481
482 let row_count = backend.count_rows(&table_name, None).await?;
483 check_oom_guard(row_count, max_rows, &self.edge_type, &self.direction)?;
484
485 info!(
486 edge_type = %self.edge_type,
487 direction = %self.direction,
488 row_count,
489 estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE,
490 "Starting delta scan for compaction (backend)"
491 );
492
493 let batches = backend.scan(ScanRequest::all(&table_name)).await?;
494
495 let mut entries = Vec::new();
496 for batch in batches {
497 let mut batch_entries = self.parse_batch(&batch, schema)?;
498 entries.append(&mut batch_entries);
499 }
500
501 self.sort_entries(&mut entries);
502
503 Ok(entries)
504 }
505
506 pub async fn replace(&self, backend: &dyn StorageBackend, batch: RecordBatch) -> Result<()> {
510 let table_name = self.table_name();
511 let arrow_schema = batch.schema();
512 backend
513 .replace_table_atomic(&table_name, vec![batch], arrow_schema)
514 .await
515 }
516
517 pub async fn read_deltas(
521 &self,
522 backend: &dyn StorageBackend,
523 vid: Vid,
524 schema: &Schema,
525 version_hwm: Option<u64>,
526 ) -> Result<Vec<L1Entry>> {
527 let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
528
529 if !backend.table_exists(&table_name).await? {
530 return Ok(vec![]);
531 }
532
533 let base_filter = format!("{} = {}", self.filter_column(), vid.as_u64());
534
535 let final_filter = if let Some(hwm) = version_hwm {
537 format!("({}) AND (_version <= {})", base_filter, hwm)
538 } else {
539 base_filter
540 };
541
542 let batches = backend
543 .scan(ScanRequest::all(&table_name).with_filter(final_filter))
544 .await?;
545
546 let mut entries = Vec::new();
547 for batch in batches {
548 let mut batch_entries = self.parse_batch(&batch, schema)?;
549 entries.append(&mut batch_entries);
550 }
551
552 Ok(entries)
553 }
554
555 pub async fn read_deltas_batch(
560 &self,
561 backend: &dyn StorageBackend,
562 vids: &[Vid],
563 schema: &Schema,
564 version_hwm: Option<u64>,
565 ) -> Result<HashMap<Vid, Vec<L1Entry>>> {
566 if vids.is_empty() {
567 return Ok(HashMap::new());
568 }
569
570 let table_name = table_names::delta_table_name(&self.edge_type, &self.direction);
571
572 if !backend.table_exists(&table_name).await? {
573 return Ok(HashMap::new());
574 }
575
576 let vid_list = vids
578 .iter()
579 .map(|v| v.as_u64().to_string())
580 .collect::<Vec<_>>()
581 .join(", ");
582 let mut filter = format!("{} IN ({})", self.filter_column(), vid_list);
583
584 if let Some(hwm) = version_hwm {
586 filter = format!("({}) AND (_version <= {})", filter, hwm);
587 }
588
589 let batches = backend
590 .scan(ScanRequest::all(&table_name).with_filter(filter))
591 .await?;
592
593 let is_fwd = self.direction == "fwd";
595 let mut result: HashMap<Vid, Vec<L1Entry>> = HashMap::new();
596 for batch in batches {
597 let entries = self.parse_batch(&batch, schema)?;
598 for entry in entries {
599 let vid = if is_fwd { entry.src_vid } else { entry.dst_vid };
600 result.entry(vid).or_default().push(entry);
601 }
602 }
603
604 Ok(result)
605 }
606}
607
608#[cfg(test)]
609mod tests {
610 use super::*;
611
612 #[test]
613 #[expect(
614 clippy::assertions_on_constants,
615 reason = "Validating configuration constants intentionally"
616 )]
617 fn test_constants_are_reasonable() {
618 assert_eq!(DEFAULT_MAX_COMPACTION_ROWS, 5_000_000);
620
621 assert!(ENTRY_SIZE_ESTIMATE >= 100, "Entry size estimate too low");
623 assert!(ENTRY_SIZE_ESTIMATE <= 300, "Entry size estimate too high");
624
625 let estimated_gb =
627 (DEFAULT_MAX_COMPACTION_ROWS * ENTRY_SIZE_ESTIMATE) as f64 / (1024.0 * 1024.0 * 1024.0);
628 assert!(
629 estimated_gb < 1.0,
630 "5M entries should fit in under 1GB with current estimate"
631 );
632 }
633
634 #[test]
635 fn test_memory_estimate_formatting() {
636 let row_count = 10_000_000;
638 let estimated_bytes = row_count * ENTRY_SIZE_ESTIMATE;
639 let estimated_gb = estimated_bytes as f64 / (1024.0 * 1024.0 * 1024.0);
640
641 assert!(
643 estimated_gb > 1.0 && estimated_gb < 2.0,
644 "10M rows should be 1-2 GB"
645 );
646 }
647}