1use std::collections::HashMap;
2use std::sync::Arc;
3
4use arrow_array::{Array, RecordBatch, StringArray, UInt64Array, new_null_array};
5use arrow_schema::{DataType, Field, Schema, SchemaRef};
6use futures::TryStreamExt;
7use lance::Dataset;
8
9use crate::error::{OmniError, Result};
10
11use super::layout::version_object_id;
12use super::metadata::TableVersionMetadata;
13use super::{OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_TOMBSTONE, OBJECT_TYPE_TABLE_VERSION};
14
15#[derive(Debug, Clone)]
16pub struct SubTableEntry {
17 pub table_key: String,
18 pub table_path: String,
19 pub table_version: u64,
20 pub table_branch: Option<String>,
21 pub row_count: u64,
22 pub(crate) version_metadata: TableVersionMetadata,
23}
24
25#[derive(Debug, Clone)]
26pub(super) struct ManifestState {
27 pub(super) version: u64,
28 pub(super) entries: Vec<SubTableEntry>,
29}
30
31#[derive(Debug, Clone)]
32struct TableTombstoneEntry {
33 table_key: String,
34 tombstone_version: u64,
35}
36
37#[derive(Debug, Clone)]
38struct ManifestScan {
39 table_locations: HashMap<String, String>,
40 version_entries: Vec<SubTableEntry>,
41 tombstones: Vec<TableTombstoneEntry>,
42}
43
44pub(super) fn manifest_schema() -> SchemaRef {
45 let object_id_metadata: HashMap<String, String> =
53 [("lance-schema:unenforced-primary-key", "true")]
54 .into_iter()
55 .map(|(k, v)| (k.to_string(), v.to_string()))
56 .collect();
57 Arc::new(Schema::new(vec![
58 Field::new("object_id", DataType::Utf8, false).with_metadata(object_id_metadata),
59 Field::new("object_type", DataType::Utf8, false),
60 Field::new("location", DataType::Utf8, true),
61 Field::new("metadata", DataType::Utf8, true),
62 Field::new(
63 "base_objects",
64 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
65 true,
66 ),
67 Field::new("table_key", DataType::Utf8, false),
68 Field::new("table_version", DataType::UInt64, true),
69 Field::new("table_branch", DataType::Utf8, true),
70 Field::new("row_count", DataType::UInt64, true),
71 ]))
72}
73
74pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result<ManifestState> {
75 let version = dataset.version().version;
76 let scan = read_manifest_scan(dataset).await?;
77 let mut latest_versions = HashMap::<String, SubTableEntry>::new();
78
79 for entry in scan.version_entries {
80 match latest_versions.get(&entry.table_key) {
81 Some(existing) if existing.table_version >= entry.table_version => {}
82 _ => {
83 latest_versions.insert(entry.table_key.clone(), entry);
84 }
85 }
86 }
87
88 let mut tombstones = HashMap::<String, u64>::new();
89 for tombstone in scan.tombstones {
90 match tombstones.get(&tombstone.table_key) {
91 Some(existing) if *existing >= tombstone.tombstone_version => {}
92 _ => {
93 tombstones.insert(tombstone.table_key, tombstone.tombstone_version);
94 }
95 }
96 }
97
98 let mut entries: Vec<SubTableEntry> = latest_versions
99 .into_values()
100 .filter(|entry| {
101 tombstones
102 .get(&entry.table_key)
103 .map(|tombstone_version| *tombstone_version < entry.table_version)
104 .unwrap_or(true)
105 })
106 .collect();
107 entries.sort_by(|a, b| a.table_key.cmp(&b.table_key));
108
109 Ok(ManifestState { version, entries })
110}
111
112pub(super) async fn read_manifest_entries(dataset: &Dataset) -> Result<Vec<SubTableEntry>> {
113 Ok(read_manifest_scan(dataset).await?.version_entries)
114}
115
116pub(super) async fn read_registered_table_locations(
117 dataset: &Dataset,
118) -> Result<HashMap<String, String>> {
119 Ok(read_manifest_scan(dataset).await?.table_locations)
120}
121
122pub(super) async fn read_tombstone_versions(
123 dataset: &Dataset,
124) -> Result<HashMap<(String, u64), ()>> {
125 Ok(read_manifest_scan(dataset)
126 .await?
127 .tombstones
128 .into_iter()
129 .map(|tombstone| ((tombstone.table_key, tombstone.tombstone_version), ()))
130 .collect())
131}
132
133async fn read_manifest_scan(dataset: &Dataset) -> Result<ManifestScan> {
134 let batches: Vec<RecordBatch> = dataset
135 .scan()
136 .try_into_stream()
137 .await
138 .map_err(|e| OmniError::Lance(e.to_string()))?
139 .try_collect()
140 .await
141 .map_err(|e| OmniError::Lance(e.to_string()))?;
142
143 let mut table_locations = HashMap::new();
144 let mut version_entries = Vec::new();
145 let mut tombstones = Vec::new();
146
147 for batch in &batches {
148 let object_types = string_column(batch, "object_type")?;
149 let locations = string_column(batch, "location")?;
150 let metadata = string_column(batch, "metadata")?;
151 let table_keys = string_column(batch, "table_key")?;
152 let versions = u64_column(batch, "table_version")?;
153 let branches = string_column(batch, "table_branch")?;
154 let row_counts = u64_column(batch, "row_count")?;
155
156 for row in 0..batch.num_rows() {
157 let table_key = table_keys.value(row).to_string();
158 match object_types.value(row) {
159 OBJECT_TYPE_TABLE => {
160 if locations.is_null(row) {
161 return Err(OmniError::manifest_internal(format!(
162 "manifest table row missing location for {}",
163 table_key
164 )));
165 }
166 table_locations.insert(table_key, locations.value(row).to_string());
167 }
168 OBJECT_TYPE_TABLE_VERSION => {
169 let table_version = required_u64(versions, row, "table_version")?;
170 let row_count = required_u64(row_counts, row, "row_count")?;
171 if metadata.is_null(row) {
172 return Err(OmniError::manifest_internal(format!(
173 "manifest table_version row missing metadata for {}",
174 table_key
175 )));
176 }
177 let table_branch = if branches.is_null(row) {
178 None
179 } else {
180 Some(branches.value(row).to_string())
181 };
182 version_entries.push(SubTableEntry {
183 table_key: table_key.clone(),
184 table_path: String::new(),
185 table_version,
186 table_branch,
187 row_count,
188 version_metadata: TableVersionMetadata::from_json_str(metadata.value(row))?,
189 });
190 }
191 OBJECT_TYPE_TABLE_TOMBSTONE => {
192 let tombstone_version = required_u64(versions, row, "table_version")?;
193 tombstones.push(TableTombstoneEntry {
194 table_key,
195 tombstone_version,
196 });
197 }
198 _ => {}
199 }
200 }
201 }
202
203 let mut entries = version_entries
204 .into_iter()
205 .map(|mut entry| {
206 entry.table_path = table_locations
207 .get(&entry.table_key)
208 .cloned()
209 .ok_or_else(|| {
210 OmniError::manifest_internal(format!(
211 "manifest missing table row for {}",
212 entry.table_key
213 ))
214 })?;
215 Ok(entry)
216 })
217 .collect::<Result<Vec<_>>>()?;
218 entries.sort_by(|a, b| {
219 a.table_key
220 .cmp(&b.table_key)
221 .then(a.table_version.cmp(&b.table_version))
222 });
223
224 Ok(ManifestScan {
225 table_locations,
226 version_entries: entries,
227 tombstones,
228 })
229}
230
231pub(super) fn entries_to_batch(
232 entries: &[SubTableEntry],
233 version_metadata: &HashMap<String, String>,
234) -> Result<RecordBatch> {
235 let mut object_ids = Vec::with_capacity(entries.len() * 2);
236 let mut object_types = Vec::with_capacity(entries.len() * 2);
237 let mut locations = Vec::with_capacity(entries.len() * 2);
238 let mut metadata = Vec::with_capacity(entries.len() * 2);
239 let mut table_keys = Vec::with_capacity(entries.len() * 2);
240 let mut table_versions = Vec::with_capacity(entries.len() * 2);
241 let mut table_branches = Vec::with_capacity(entries.len() * 2);
242 let mut row_counts = Vec::with_capacity(entries.len() * 2);
243
244 for entry in entries {
245 object_ids.push(entry.table_key.clone());
246 object_types.push(OBJECT_TYPE_TABLE.to_string());
247 locations.push(Some(entry.table_path.clone()));
248 metadata.push(None);
249 table_keys.push(entry.table_key.clone());
250 table_versions.push(None);
251 table_branches.push(None);
252 row_counts.push(None);
253
254 object_ids.push(version_object_id(&entry.table_key, entry.table_version));
255 object_types.push(OBJECT_TYPE_TABLE_VERSION.to_string());
256 locations.push(None);
257 metadata.push(Some(
258 version_metadata
259 .get(&entry.table_key)
260 .cloned()
261 .ok_or_else(|| {
262 OmniError::manifest_internal(format!(
263 "missing initial version metadata for {}",
264 entry.table_key
265 ))
266 })?,
267 ));
268 table_keys.push(entry.table_key.clone());
269 table_versions.push(Some(entry.table_version));
270 table_branches.push(entry.table_branch.clone());
271 row_counts.push(Some(entry.row_count));
272 }
273
274 manifest_rows_batch(
275 object_ids,
276 object_types,
277 locations,
278 metadata,
279 table_keys,
280 table_versions,
281 table_branches,
282 row_counts,
283 )
284}
285
286pub(super) fn manifest_rows_batch(
287 object_ids: Vec<String>,
288 object_types: Vec<String>,
289 locations: Vec<Option<String>>,
290 metadata: Vec<Option<String>>,
291 table_keys: Vec<String>,
292 table_versions: Vec<Option<u64>>,
293 table_branches: Vec<Option<String>>,
294 row_counts: Vec<Option<u64>>,
295) -> Result<RecordBatch> {
296 let len = object_ids.len();
297 RecordBatch::try_new(
298 manifest_schema(),
299 vec![
300 Arc::new(StringArray::from(object_ids)),
301 Arc::new(StringArray::from(object_types)),
302 Arc::new(StringArray::from(locations)),
303 Arc::new(StringArray::from(metadata)),
304 new_null_array(
305 &DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
306 len,
307 ),
308 Arc::new(StringArray::from(table_keys)),
309 Arc::new(UInt64Array::from(table_versions)),
310 Arc::new(StringArray::from(table_branches)),
311 Arc::new(UInt64Array::from(row_counts)),
312 ],
313 )
314 .map_err(|e| OmniError::Lance(e.to_string()))
315}
316
317pub(super) fn string_column<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a StringArray> {
318 batch
319 .column_by_name(name)
320 .ok_or_else(|| {
321 OmniError::manifest_internal(format!("manifest batch missing '{name}' column"))
322 })?
323 .as_any()
324 .downcast_ref::<StringArray>()
325 .ok_or_else(|| {
326 OmniError::manifest_internal(format!("manifest column '{name}' is not Utf8"))
327 })
328}
329
330fn u64_column<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
331 batch
332 .column_by_name(name)
333 .ok_or_else(|| {
334 OmniError::manifest_internal(format!("manifest batch missing '{name}' column"))
335 })?
336 .as_any()
337 .downcast_ref::<UInt64Array>()
338 .ok_or_else(|| {
339 OmniError::manifest_internal(format!("manifest column '{name}' is not UInt64"))
340 })
341}
342
343fn required_u64(column: &UInt64Array, row: usize, name: &str) -> Result<u64> {
344 if column.is_null(row) {
345 return Err(OmniError::manifest_internal(format!(
346 "manifest column '{name}' is null at row {row}"
347 )));
348 }
349 Ok(column.value(row))
350}