Skip to main content

omnigraph/db/manifest/
state.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use arrow_array::{Array, RecordBatch, StringArray, UInt64Array, new_null_array};
5use arrow_schema::{DataType, Field, Schema, SchemaRef};
6use futures::TryStreamExt;
7use lance::Dataset;
8
9use crate::error::{OmniError, Result};
10
11use super::layout::version_object_id;
12use super::metadata::TableVersionMetadata;
13use super::{OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_TOMBSTONE, OBJECT_TYPE_TABLE_VERSION};
14
15#[derive(Debug, Clone)]
16pub struct SubTableEntry {
17    pub table_key: String,
18    pub table_path: String,
19    pub table_version: u64,
20    pub table_branch: Option<String>,
21    pub row_count: u64,
22    pub(crate) version_metadata: TableVersionMetadata,
23}
24
25#[derive(Debug, Clone)]
26pub(super) struct ManifestState {
27    pub(super) version: u64,
28    pub(super) entries: Vec<SubTableEntry>,
29}
30
31#[derive(Debug, Clone)]
32struct TableTombstoneEntry {
33    table_key: String,
34    tombstone_version: u64,
35}
36
37#[derive(Debug, Clone)]
38struct ManifestScan {
39    table_locations: HashMap<String, String>,
40    version_entries: Vec<SubTableEntry>,
41    tombstones: Vec<TableTombstoneEntry>,
42}
43
44pub(super) fn manifest_schema() -> SchemaRef {
45    Arc::new(Schema::new(vec![
46        Field::new("object_id", DataType::Utf8, false),
47        Field::new("object_type", DataType::Utf8, false),
48        Field::new("location", DataType::Utf8, true),
49        Field::new("metadata", DataType::Utf8, true),
50        Field::new(
51            "base_objects",
52            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
53            true,
54        ),
55        Field::new("table_key", DataType::Utf8, false),
56        Field::new("table_version", DataType::UInt64, true),
57        Field::new("table_branch", DataType::Utf8, true),
58        Field::new("row_count", DataType::UInt64, true),
59    ]))
60}
61
62pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result<ManifestState> {
63    let version = dataset.version().version;
64    let scan = read_manifest_scan(dataset).await?;
65    let mut latest_versions = HashMap::<String, SubTableEntry>::new();
66
67    for entry in scan.version_entries {
68        match latest_versions.get(&entry.table_key) {
69            Some(existing) if existing.table_version >= entry.table_version => {}
70            _ => {
71                latest_versions.insert(entry.table_key.clone(), entry);
72            }
73        }
74    }
75
76    let mut tombstones = HashMap::<String, u64>::new();
77    for tombstone in scan.tombstones {
78        match tombstones.get(&tombstone.table_key) {
79            Some(existing) if *existing >= tombstone.tombstone_version => {}
80            _ => {
81                tombstones.insert(tombstone.table_key, tombstone.tombstone_version);
82            }
83        }
84    }
85
86    let mut entries: Vec<SubTableEntry> = latest_versions
87        .into_values()
88        .filter(|entry| {
89            tombstones
90                .get(&entry.table_key)
91                .map(|tombstone_version| *tombstone_version < entry.table_version)
92                .unwrap_or(true)
93        })
94        .collect();
95    entries.sort_by(|a, b| a.table_key.cmp(&b.table_key));
96
97    Ok(ManifestState { version, entries })
98}
99
100pub(super) async fn read_manifest_entries(dataset: &Dataset) -> Result<Vec<SubTableEntry>> {
101    Ok(read_manifest_scan(dataset).await?.version_entries)
102}
103
104pub(super) async fn read_registered_table_locations(
105    dataset: &Dataset,
106) -> Result<HashMap<String, String>> {
107    Ok(read_manifest_scan(dataset).await?.table_locations)
108}
109
110pub(super) async fn read_tombstone_versions(
111    dataset: &Dataset,
112) -> Result<HashMap<(String, u64), ()>> {
113    Ok(read_manifest_scan(dataset)
114        .await?
115        .tombstones
116        .into_iter()
117        .map(|tombstone| ((tombstone.table_key, tombstone.tombstone_version), ()))
118        .collect())
119}
120
121async fn read_manifest_scan(dataset: &Dataset) -> Result<ManifestScan> {
122    let batches: Vec<RecordBatch> = dataset
123        .scan()
124        .try_into_stream()
125        .await
126        .map_err(|e| OmniError::Lance(e.to_string()))?
127        .try_collect()
128        .await
129        .map_err(|e| OmniError::Lance(e.to_string()))?;
130
131    let mut table_locations = HashMap::new();
132    let mut version_entries = Vec::new();
133    let mut tombstones = Vec::new();
134
135    for batch in &batches {
136        let object_types = string_column(batch, "object_type")?;
137        let locations = string_column(batch, "location")?;
138        let metadata = string_column(batch, "metadata")?;
139        let table_keys = string_column(batch, "table_key")?;
140        let versions = u64_column(batch, "table_version")?;
141        let branches = string_column(batch, "table_branch")?;
142        let row_counts = u64_column(batch, "row_count")?;
143
144        for row in 0..batch.num_rows() {
145            let table_key = table_keys.value(row).to_string();
146            match object_types.value(row) {
147                OBJECT_TYPE_TABLE => {
148                    if locations.is_null(row) {
149                        return Err(OmniError::manifest_internal(format!(
150                            "manifest table row missing location for {}",
151                            table_key
152                        )));
153                    }
154                    table_locations.insert(table_key, locations.value(row).to_string());
155                }
156                OBJECT_TYPE_TABLE_VERSION => {
157                    let table_version = required_u64(versions, row, "table_version")?;
158                    let row_count = required_u64(row_counts, row, "row_count")?;
159                    if metadata.is_null(row) {
160                        return Err(OmniError::manifest_internal(format!(
161                            "manifest table_version row missing metadata for {}",
162                            table_key
163                        )));
164                    }
165                    let table_branch = if branches.is_null(row) {
166                        None
167                    } else {
168                        Some(branches.value(row).to_string())
169                    };
170                    version_entries.push(SubTableEntry {
171                        table_key: table_key.clone(),
172                        table_path: String::new(),
173                        table_version,
174                        table_branch,
175                        row_count,
176                        version_metadata: TableVersionMetadata::from_json_str(metadata.value(row))?,
177                    });
178                }
179                OBJECT_TYPE_TABLE_TOMBSTONE => {
180                    let tombstone_version = required_u64(versions, row, "table_version")?;
181                    tombstones.push(TableTombstoneEntry {
182                        table_key,
183                        tombstone_version,
184                    });
185                }
186                _ => {}
187            }
188        }
189    }
190
191    let mut entries = version_entries
192        .into_iter()
193        .map(|mut entry| {
194            entry.table_path = table_locations
195                .get(&entry.table_key)
196                .cloned()
197                .ok_or_else(|| {
198                    OmniError::manifest_internal(format!(
199                        "manifest missing table row for {}",
200                        entry.table_key
201                    ))
202                })?;
203            Ok(entry)
204        })
205        .collect::<Result<Vec<_>>>()?;
206    entries.sort_by(|a, b| {
207        a.table_key
208            .cmp(&b.table_key)
209            .then(a.table_version.cmp(&b.table_version))
210    });
211
212    Ok(ManifestScan {
213        table_locations,
214        version_entries: entries,
215        tombstones,
216    })
217}
218
219pub(super) fn entries_to_batch(
220    entries: &[SubTableEntry],
221    version_metadata: &HashMap<String, String>,
222) -> Result<RecordBatch> {
223    let mut object_ids = Vec::with_capacity(entries.len() * 2);
224    let mut object_types = Vec::with_capacity(entries.len() * 2);
225    let mut locations = Vec::with_capacity(entries.len() * 2);
226    let mut metadata = Vec::with_capacity(entries.len() * 2);
227    let mut table_keys = Vec::with_capacity(entries.len() * 2);
228    let mut table_versions = Vec::with_capacity(entries.len() * 2);
229    let mut table_branches = Vec::with_capacity(entries.len() * 2);
230    let mut row_counts = Vec::with_capacity(entries.len() * 2);
231
232    for entry in entries {
233        object_ids.push(entry.table_key.clone());
234        object_types.push(OBJECT_TYPE_TABLE.to_string());
235        locations.push(Some(entry.table_path.clone()));
236        metadata.push(None);
237        table_keys.push(entry.table_key.clone());
238        table_versions.push(None);
239        table_branches.push(None);
240        row_counts.push(None);
241
242        object_ids.push(version_object_id(&entry.table_key, entry.table_version));
243        object_types.push(OBJECT_TYPE_TABLE_VERSION.to_string());
244        locations.push(None);
245        metadata.push(Some(
246            version_metadata
247                .get(&entry.table_key)
248                .cloned()
249                .ok_or_else(|| {
250                    OmniError::manifest_internal(format!(
251                        "missing initial version metadata for {}",
252                        entry.table_key
253                    ))
254                })?,
255        ));
256        table_keys.push(entry.table_key.clone());
257        table_versions.push(Some(entry.table_version));
258        table_branches.push(entry.table_branch.clone());
259        row_counts.push(Some(entry.row_count));
260    }
261
262    manifest_rows_batch(
263        object_ids,
264        object_types,
265        locations,
266        metadata,
267        table_keys,
268        table_versions,
269        table_branches,
270        row_counts,
271    )
272}
273
274pub(super) fn manifest_rows_batch(
275    object_ids: Vec<String>,
276    object_types: Vec<String>,
277    locations: Vec<Option<String>>,
278    metadata: Vec<Option<String>>,
279    table_keys: Vec<String>,
280    table_versions: Vec<Option<u64>>,
281    table_branches: Vec<Option<String>>,
282    row_counts: Vec<Option<u64>>,
283) -> Result<RecordBatch> {
284    let len = object_ids.len();
285    RecordBatch::try_new(
286        manifest_schema(),
287        vec![
288            Arc::new(StringArray::from(object_ids)),
289            Arc::new(StringArray::from(object_types)),
290            Arc::new(StringArray::from(locations)),
291            Arc::new(StringArray::from(metadata)),
292            new_null_array(
293                &DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
294                len,
295            ),
296            Arc::new(StringArray::from(table_keys)),
297            Arc::new(UInt64Array::from(table_versions)),
298            Arc::new(StringArray::from(table_branches)),
299            Arc::new(UInt64Array::from(row_counts)),
300        ],
301    )
302    .map_err(|e| OmniError::Lance(e.to_string()))
303}
304
305pub(super) fn string_column<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a StringArray> {
306    batch
307        .column_by_name(name)
308        .ok_or_else(|| {
309            OmniError::manifest_internal(format!("manifest batch missing '{name}' column"))
310        })?
311        .as_any()
312        .downcast_ref::<StringArray>()
313        .ok_or_else(|| {
314            OmniError::manifest_internal(format!("manifest column '{name}' is not Utf8"))
315        })
316}
317
318fn u64_column<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
319    batch
320        .column_by_name(name)
321        .ok_or_else(|| {
322            OmniError::manifest_internal(format!("manifest batch missing '{name}' column"))
323        })?
324        .as_any()
325        .downcast_ref::<UInt64Array>()
326        .ok_or_else(|| {
327            OmniError::manifest_internal(format!("manifest column '{name}' is not UInt64"))
328        })
329}
330
331fn required_u64(column: &UInt64Array, row: usize, name: &str) -> Result<u64> {
332    if column.is_null(row) {
333        return Err(OmniError::manifest_internal(format!(
334            "manifest column '{name}' is null at row {row}"
335        )));
336    }
337    Ok(column.value(row))
338}