Skip to main content

omnigraph/db/manifest/
state.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use arrow_array::{Array, RecordBatch, StringArray, UInt64Array, new_null_array};
5use arrow_schema::{DataType, Field, Schema, SchemaRef};
6use futures::TryStreamExt;
7use lance::Dataset;
8
9use crate::error::{OmniError, Result};
10
11use super::layout::version_object_id;
12use super::metadata::TableVersionMetadata;
13use super::{OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_TOMBSTONE, OBJECT_TYPE_TABLE_VERSION};
14
15#[derive(Debug, Clone)]
16pub struct SubTableEntry {
17    pub table_key: String,
18    pub table_path: String,
19    pub table_version: u64,
20    pub table_branch: Option<String>,
21    pub row_count: u64,
22    pub(crate) version_metadata: TableVersionMetadata,
23}
24
25#[derive(Debug, Clone)]
26pub(super) struct ManifestState {
27    pub(super) version: u64,
28    pub(super) entries: Vec<SubTableEntry>,
29}
30
31#[derive(Debug, Clone)]
32struct TableTombstoneEntry {
33    table_key: String,
34    tombstone_version: u64,
35}
36
37#[derive(Debug, Clone)]
38struct ManifestScan {
39    table_locations: HashMap<String, String>,
40    version_entries: Vec<SubTableEntry>,
41    tombstones: Vec<TableTombstoneEntry>,
42}
43
44pub(super) fn manifest_schema() -> SchemaRef {
45    // `object_id` is the merge-insert join key in the publisher; marking it as
46    // Lance's unenforced primary key engages row-level CAS at commit time, so
47    // two concurrent writers that try to land the same `object_id` row are
48    // detected by Lance via bloom-filter intersection (see
49    // `.context/merge-insert-cas-granularity.md`). Without this metadata,
50    // Lance's conflict resolver would silently rebase both writers' new
51    // fragments and admit duplicate rows.
52    let object_id_metadata: HashMap<String, String> =
53        [("lance-schema:unenforced-primary-key", "true")]
54            .into_iter()
55            .map(|(k, v)| (k.to_string(), v.to_string()))
56            .collect();
57    Arc::new(Schema::new(vec![
58        Field::new("object_id", DataType::Utf8, false).with_metadata(object_id_metadata),
59        Field::new("object_type", DataType::Utf8, false),
60        Field::new("location", DataType::Utf8, true),
61        Field::new("metadata", DataType::Utf8, true),
62        Field::new(
63            "base_objects",
64            DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
65            true,
66        ),
67        Field::new("table_key", DataType::Utf8, false),
68        Field::new("table_version", DataType::UInt64, true),
69        Field::new("table_branch", DataType::Utf8, true),
70        Field::new("row_count", DataType::UInt64, true),
71    ]))
72}
73
74pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result<ManifestState> {
75    let version = dataset.version().version;
76    let scan = read_manifest_scan(dataset).await?;
77    let mut latest_versions = HashMap::<String, SubTableEntry>::new();
78
79    for entry in scan.version_entries {
80        match latest_versions.get(&entry.table_key) {
81            Some(existing) if existing.table_version >= entry.table_version => {}
82            _ => {
83                latest_versions.insert(entry.table_key.clone(), entry);
84            }
85        }
86    }
87
88    let mut tombstones = HashMap::<String, u64>::new();
89    for tombstone in scan.tombstones {
90        match tombstones.get(&tombstone.table_key) {
91            Some(existing) if *existing >= tombstone.tombstone_version => {}
92            _ => {
93                tombstones.insert(tombstone.table_key, tombstone.tombstone_version);
94            }
95        }
96    }
97
98    let mut entries: Vec<SubTableEntry> = latest_versions
99        .into_values()
100        .filter(|entry| {
101            tombstones
102                .get(&entry.table_key)
103                .map(|tombstone_version| *tombstone_version < entry.table_version)
104                .unwrap_or(true)
105        })
106        .collect();
107    entries.sort_by(|a, b| a.table_key.cmp(&b.table_key));
108
109    Ok(ManifestState { version, entries })
110}
111
112pub(super) async fn read_manifest_entries(dataset: &Dataset) -> Result<Vec<SubTableEntry>> {
113    Ok(read_manifest_scan(dataset).await?.version_entries)
114}
115
116pub(super) async fn read_registered_table_locations(
117    dataset: &Dataset,
118) -> Result<HashMap<String, String>> {
119    Ok(read_manifest_scan(dataset).await?.table_locations)
120}
121
122pub(super) async fn read_tombstone_versions(
123    dataset: &Dataset,
124) -> Result<HashMap<(String, u64), ()>> {
125    Ok(read_manifest_scan(dataset)
126        .await?
127        .tombstones
128        .into_iter()
129        .map(|tombstone| ((tombstone.table_key, tombstone.tombstone_version), ()))
130        .collect())
131}
132
133async fn read_manifest_scan(dataset: &Dataset) -> Result<ManifestScan> {
134    let batches: Vec<RecordBatch> = dataset
135        .scan()
136        .try_into_stream()
137        .await
138        .map_err(|e| OmniError::Lance(e.to_string()))?
139        .try_collect()
140        .await
141        .map_err(|e| OmniError::Lance(e.to_string()))?;
142
143    let mut table_locations = HashMap::new();
144    let mut version_entries = Vec::new();
145    let mut tombstones = Vec::new();
146
147    for batch in &batches {
148        let object_types = string_column(batch, "object_type")?;
149        let locations = string_column(batch, "location")?;
150        let metadata = string_column(batch, "metadata")?;
151        let table_keys = string_column(batch, "table_key")?;
152        let versions = u64_column(batch, "table_version")?;
153        let branches = string_column(batch, "table_branch")?;
154        let row_counts = u64_column(batch, "row_count")?;
155
156        for row in 0..batch.num_rows() {
157            let table_key = table_keys.value(row).to_string();
158            match object_types.value(row) {
159                OBJECT_TYPE_TABLE => {
160                    if locations.is_null(row) {
161                        return Err(OmniError::manifest_internal(format!(
162                            "manifest table row missing location for {}",
163                            table_key
164                        )));
165                    }
166                    table_locations.insert(table_key, locations.value(row).to_string());
167                }
168                OBJECT_TYPE_TABLE_VERSION => {
169                    let table_version = required_u64(versions, row, "table_version")?;
170                    let row_count = required_u64(row_counts, row, "row_count")?;
171                    if metadata.is_null(row) {
172                        return Err(OmniError::manifest_internal(format!(
173                            "manifest table_version row missing metadata for {}",
174                            table_key
175                        )));
176                    }
177                    let table_branch = if branches.is_null(row) {
178                        None
179                    } else {
180                        Some(branches.value(row).to_string())
181                    };
182                    version_entries.push(SubTableEntry {
183                        table_key: table_key.clone(),
184                        table_path: String::new(),
185                        table_version,
186                        table_branch,
187                        row_count,
188                        version_metadata: TableVersionMetadata::from_json_str(metadata.value(row))?,
189                    });
190                }
191                OBJECT_TYPE_TABLE_TOMBSTONE => {
192                    let tombstone_version = required_u64(versions, row, "table_version")?;
193                    tombstones.push(TableTombstoneEntry {
194                        table_key,
195                        tombstone_version,
196                    });
197                }
198                _ => {}
199            }
200        }
201    }
202
203    let mut entries = version_entries
204        .into_iter()
205        .map(|mut entry| {
206            entry.table_path = table_locations
207                .get(&entry.table_key)
208                .cloned()
209                .ok_or_else(|| {
210                    OmniError::manifest_internal(format!(
211                        "manifest missing table row for {}",
212                        entry.table_key
213                    ))
214                })?;
215            Ok(entry)
216        })
217        .collect::<Result<Vec<_>>>()?;
218    entries.sort_by(|a, b| {
219        a.table_key
220            .cmp(&b.table_key)
221            .then(a.table_version.cmp(&b.table_version))
222    });
223
224    Ok(ManifestScan {
225        table_locations,
226        version_entries: entries,
227        tombstones,
228    })
229}
230
231pub(super) fn entries_to_batch(
232    entries: &[SubTableEntry],
233    version_metadata: &HashMap<String, String>,
234) -> Result<RecordBatch> {
235    let mut object_ids = Vec::with_capacity(entries.len() * 2);
236    let mut object_types = Vec::with_capacity(entries.len() * 2);
237    let mut locations = Vec::with_capacity(entries.len() * 2);
238    let mut metadata = Vec::with_capacity(entries.len() * 2);
239    let mut table_keys = Vec::with_capacity(entries.len() * 2);
240    let mut table_versions = Vec::with_capacity(entries.len() * 2);
241    let mut table_branches = Vec::with_capacity(entries.len() * 2);
242    let mut row_counts = Vec::with_capacity(entries.len() * 2);
243
244    for entry in entries {
245        object_ids.push(entry.table_key.clone());
246        object_types.push(OBJECT_TYPE_TABLE.to_string());
247        locations.push(Some(entry.table_path.clone()));
248        metadata.push(None);
249        table_keys.push(entry.table_key.clone());
250        table_versions.push(None);
251        table_branches.push(None);
252        row_counts.push(None);
253
254        object_ids.push(version_object_id(&entry.table_key, entry.table_version));
255        object_types.push(OBJECT_TYPE_TABLE_VERSION.to_string());
256        locations.push(None);
257        metadata.push(Some(
258            version_metadata
259                .get(&entry.table_key)
260                .cloned()
261                .ok_or_else(|| {
262                    OmniError::manifest_internal(format!(
263                        "missing initial version metadata for {}",
264                        entry.table_key
265                    ))
266                })?,
267        ));
268        table_keys.push(entry.table_key.clone());
269        table_versions.push(Some(entry.table_version));
270        table_branches.push(entry.table_branch.clone());
271        row_counts.push(Some(entry.row_count));
272    }
273
274    manifest_rows_batch(
275        object_ids,
276        object_types,
277        locations,
278        metadata,
279        table_keys,
280        table_versions,
281        table_branches,
282        row_counts,
283    )
284}
285
286pub(super) fn manifest_rows_batch(
287    object_ids: Vec<String>,
288    object_types: Vec<String>,
289    locations: Vec<Option<String>>,
290    metadata: Vec<Option<String>>,
291    table_keys: Vec<String>,
292    table_versions: Vec<Option<u64>>,
293    table_branches: Vec<Option<String>>,
294    row_counts: Vec<Option<u64>>,
295) -> Result<RecordBatch> {
296    let len = object_ids.len();
297    RecordBatch::try_new(
298        manifest_schema(),
299        vec![
300            Arc::new(StringArray::from(object_ids)),
301            Arc::new(StringArray::from(object_types)),
302            Arc::new(StringArray::from(locations)),
303            Arc::new(StringArray::from(metadata)),
304            new_null_array(
305                &DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
306                len,
307            ),
308            Arc::new(StringArray::from(table_keys)),
309            Arc::new(UInt64Array::from(table_versions)),
310            Arc::new(StringArray::from(table_branches)),
311            Arc::new(UInt64Array::from(row_counts)),
312        ],
313    )
314    .map_err(|e| OmniError::Lance(e.to_string()))
315}
316
317pub(super) fn string_column<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a StringArray> {
318    batch
319        .column_by_name(name)
320        .ok_or_else(|| {
321            OmniError::manifest_internal(format!("manifest batch missing '{name}' column"))
322        })?
323        .as_any()
324        .downcast_ref::<StringArray>()
325        .ok_or_else(|| {
326            OmniError::manifest_internal(format!("manifest column '{name}' is not Utf8"))
327        })
328}
329
330fn u64_column<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
331    batch
332        .column_by_name(name)
333        .ok_or_else(|| {
334            OmniError::manifest_internal(format!("manifest batch missing '{name}' column"))
335        })?
336        .as_any()
337        .downcast_ref::<UInt64Array>()
338        .ok_or_else(|| {
339            OmniError::manifest_internal(format!("manifest column '{name}' is not UInt64"))
340        })
341}
342
343fn required_u64(column: &UInt64Array, row: usize, name: &str) -> Result<u64> {
344    if column.is_null(row) {
345        return Err(OmniError::manifest_internal(format!(
346            "manifest column '{name}' is null at row {row}"
347        )));
348    }
349    Ok(column.value(row))
350}