1use std::collections::HashMap;
2use std::sync::Arc;
3
4use arrow_array::{Array, RecordBatch, StringArray, UInt64Array, new_null_array};
5use arrow_schema::{DataType, Field, Schema, SchemaRef};
6use futures::TryStreamExt;
7use lance::Dataset;
8
9use crate::error::{OmniError, Result};
10
11use super::layout::version_object_id;
12use super::metadata::TableVersionMetadata;
13use super::{OBJECT_TYPE_TABLE, OBJECT_TYPE_TABLE_TOMBSTONE, OBJECT_TYPE_TABLE_VERSION};
14
15#[derive(Debug, Clone)]
16pub struct SubTableEntry {
17 pub table_key: String,
18 pub table_path: String,
19 pub table_version: u64,
20 pub table_branch: Option<String>,
21 pub row_count: u64,
22 pub(crate) version_metadata: TableVersionMetadata,
23}
24
25#[derive(Debug, Clone)]
26pub(super) struct ManifestState {
27 pub(super) version: u64,
28 pub(super) entries: Vec<SubTableEntry>,
29}
30
31#[derive(Debug, Clone)]
32struct TableTombstoneEntry {
33 table_key: String,
34 tombstone_version: u64,
35}
36
37#[derive(Debug, Clone)]
38struct ManifestScan {
39 table_locations: HashMap<String, String>,
40 version_entries: Vec<SubTableEntry>,
41 tombstones: Vec<TableTombstoneEntry>,
42}
43
44pub(super) fn manifest_schema() -> SchemaRef {
45 Arc::new(Schema::new(vec![
46 Field::new("object_id", DataType::Utf8, false),
47 Field::new("object_type", DataType::Utf8, false),
48 Field::new("location", DataType::Utf8, true),
49 Field::new("metadata", DataType::Utf8, true),
50 Field::new(
51 "base_objects",
52 DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
53 true,
54 ),
55 Field::new("table_key", DataType::Utf8, false),
56 Field::new("table_version", DataType::UInt64, true),
57 Field::new("table_branch", DataType::Utf8, true),
58 Field::new("row_count", DataType::UInt64, true),
59 ]))
60}
61
62pub(super) async fn read_manifest_state(dataset: &Dataset) -> Result<ManifestState> {
63 let version = dataset.version().version;
64 let scan = read_manifest_scan(dataset).await?;
65 let mut latest_versions = HashMap::<String, SubTableEntry>::new();
66
67 for entry in scan.version_entries {
68 match latest_versions.get(&entry.table_key) {
69 Some(existing) if existing.table_version >= entry.table_version => {}
70 _ => {
71 latest_versions.insert(entry.table_key.clone(), entry);
72 }
73 }
74 }
75
76 let mut tombstones = HashMap::<String, u64>::new();
77 for tombstone in scan.tombstones {
78 match tombstones.get(&tombstone.table_key) {
79 Some(existing) if *existing >= tombstone.tombstone_version => {}
80 _ => {
81 tombstones.insert(tombstone.table_key, tombstone.tombstone_version);
82 }
83 }
84 }
85
86 let mut entries: Vec<SubTableEntry> = latest_versions
87 .into_values()
88 .filter(|entry| {
89 tombstones
90 .get(&entry.table_key)
91 .map(|tombstone_version| *tombstone_version < entry.table_version)
92 .unwrap_or(true)
93 })
94 .collect();
95 entries.sort_by(|a, b| a.table_key.cmp(&b.table_key));
96
97 Ok(ManifestState { version, entries })
98}
99
100pub(super) async fn read_manifest_entries(dataset: &Dataset) -> Result<Vec<SubTableEntry>> {
101 Ok(read_manifest_scan(dataset).await?.version_entries)
102}
103
104pub(super) async fn read_registered_table_locations(
105 dataset: &Dataset,
106) -> Result<HashMap<String, String>> {
107 Ok(read_manifest_scan(dataset).await?.table_locations)
108}
109
110pub(super) async fn read_tombstone_versions(
111 dataset: &Dataset,
112) -> Result<HashMap<(String, u64), ()>> {
113 Ok(read_manifest_scan(dataset)
114 .await?
115 .tombstones
116 .into_iter()
117 .map(|tombstone| ((tombstone.table_key, tombstone.tombstone_version), ()))
118 .collect())
119}
120
121async fn read_manifest_scan(dataset: &Dataset) -> Result<ManifestScan> {
122 let batches: Vec<RecordBatch> = dataset
123 .scan()
124 .try_into_stream()
125 .await
126 .map_err(|e| OmniError::Lance(e.to_string()))?
127 .try_collect()
128 .await
129 .map_err(|e| OmniError::Lance(e.to_string()))?;
130
131 let mut table_locations = HashMap::new();
132 let mut version_entries = Vec::new();
133 let mut tombstones = Vec::new();
134
135 for batch in &batches {
136 let object_types = string_column(batch, "object_type")?;
137 let locations = string_column(batch, "location")?;
138 let metadata = string_column(batch, "metadata")?;
139 let table_keys = string_column(batch, "table_key")?;
140 let versions = u64_column(batch, "table_version")?;
141 let branches = string_column(batch, "table_branch")?;
142 let row_counts = u64_column(batch, "row_count")?;
143
144 for row in 0..batch.num_rows() {
145 let table_key = table_keys.value(row).to_string();
146 match object_types.value(row) {
147 OBJECT_TYPE_TABLE => {
148 if locations.is_null(row) {
149 return Err(OmniError::manifest_internal(format!(
150 "manifest table row missing location for {}",
151 table_key
152 )));
153 }
154 table_locations.insert(table_key, locations.value(row).to_string());
155 }
156 OBJECT_TYPE_TABLE_VERSION => {
157 let table_version = required_u64(versions, row, "table_version")?;
158 let row_count = required_u64(row_counts, row, "row_count")?;
159 if metadata.is_null(row) {
160 return Err(OmniError::manifest_internal(format!(
161 "manifest table_version row missing metadata for {}",
162 table_key
163 )));
164 }
165 let table_branch = if branches.is_null(row) {
166 None
167 } else {
168 Some(branches.value(row).to_string())
169 };
170 version_entries.push(SubTableEntry {
171 table_key: table_key.clone(),
172 table_path: String::new(),
173 table_version,
174 table_branch,
175 row_count,
176 version_metadata: TableVersionMetadata::from_json_str(metadata.value(row))?,
177 });
178 }
179 OBJECT_TYPE_TABLE_TOMBSTONE => {
180 let tombstone_version = required_u64(versions, row, "table_version")?;
181 tombstones.push(TableTombstoneEntry {
182 table_key,
183 tombstone_version,
184 });
185 }
186 _ => {}
187 }
188 }
189 }
190
191 let mut entries = version_entries
192 .into_iter()
193 .map(|mut entry| {
194 entry.table_path = table_locations
195 .get(&entry.table_key)
196 .cloned()
197 .ok_or_else(|| {
198 OmniError::manifest_internal(format!(
199 "manifest missing table row for {}",
200 entry.table_key
201 ))
202 })?;
203 Ok(entry)
204 })
205 .collect::<Result<Vec<_>>>()?;
206 entries.sort_by(|a, b| {
207 a.table_key
208 .cmp(&b.table_key)
209 .then(a.table_version.cmp(&b.table_version))
210 });
211
212 Ok(ManifestScan {
213 table_locations,
214 version_entries: entries,
215 tombstones,
216 })
217}
218
219pub(super) fn entries_to_batch(
220 entries: &[SubTableEntry],
221 version_metadata: &HashMap<String, String>,
222) -> Result<RecordBatch> {
223 let mut object_ids = Vec::with_capacity(entries.len() * 2);
224 let mut object_types = Vec::with_capacity(entries.len() * 2);
225 let mut locations = Vec::with_capacity(entries.len() * 2);
226 let mut metadata = Vec::with_capacity(entries.len() * 2);
227 let mut table_keys = Vec::with_capacity(entries.len() * 2);
228 let mut table_versions = Vec::with_capacity(entries.len() * 2);
229 let mut table_branches = Vec::with_capacity(entries.len() * 2);
230 let mut row_counts = Vec::with_capacity(entries.len() * 2);
231
232 for entry in entries {
233 object_ids.push(entry.table_key.clone());
234 object_types.push(OBJECT_TYPE_TABLE.to_string());
235 locations.push(Some(entry.table_path.clone()));
236 metadata.push(None);
237 table_keys.push(entry.table_key.clone());
238 table_versions.push(None);
239 table_branches.push(None);
240 row_counts.push(None);
241
242 object_ids.push(version_object_id(&entry.table_key, entry.table_version));
243 object_types.push(OBJECT_TYPE_TABLE_VERSION.to_string());
244 locations.push(None);
245 metadata.push(Some(
246 version_metadata
247 .get(&entry.table_key)
248 .cloned()
249 .ok_or_else(|| {
250 OmniError::manifest_internal(format!(
251 "missing initial version metadata for {}",
252 entry.table_key
253 ))
254 })?,
255 ));
256 table_keys.push(entry.table_key.clone());
257 table_versions.push(Some(entry.table_version));
258 table_branches.push(entry.table_branch.clone());
259 row_counts.push(Some(entry.row_count));
260 }
261
262 manifest_rows_batch(
263 object_ids,
264 object_types,
265 locations,
266 metadata,
267 table_keys,
268 table_versions,
269 table_branches,
270 row_counts,
271 )
272}
273
274pub(super) fn manifest_rows_batch(
275 object_ids: Vec<String>,
276 object_types: Vec<String>,
277 locations: Vec<Option<String>>,
278 metadata: Vec<Option<String>>,
279 table_keys: Vec<String>,
280 table_versions: Vec<Option<u64>>,
281 table_branches: Vec<Option<String>>,
282 row_counts: Vec<Option<u64>>,
283) -> Result<RecordBatch> {
284 let len = object_ids.len();
285 RecordBatch::try_new(
286 manifest_schema(),
287 vec![
288 Arc::new(StringArray::from(object_ids)),
289 Arc::new(StringArray::from(object_types)),
290 Arc::new(StringArray::from(locations)),
291 Arc::new(StringArray::from(metadata)),
292 new_null_array(
293 &DataType::List(Arc::new(Field::new("item", DataType::Utf8, true))),
294 len,
295 ),
296 Arc::new(StringArray::from(table_keys)),
297 Arc::new(UInt64Array::from(table_versions)),
298 Arc::new(StringArray::from(table_branches)),
299 Arc::new(UInt64Array::from(row_counts)),
300 ],
301 )
302 .map_err(|e| OmniError::Lance(e.to_string()))
303}
304
305pub(super) fn string_column<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a StringArray> {
306 batch
307 .column_by_name(name)
308 .ok_or_else(|| {
309 OmniError::manifest_internal(format!("manifest batch missing '{name}' column"))
310 })?
311 .as_any()
312 .downcast_ref::<StringArray>()
313 .ok_or_else(|| {
314 OmniError::manifest_internal(format!("manifest column '{name}' is not Utf8"))
315 })
316}
317
318fn u64_column<'a>(batch: &'a RecordBatch, name: &str) -> Result<&'a UInt64Array> {
319 batch
320 .column_by_name(name)
321 .ok_or_else(|| {
322 OmniError::manifest_internal(format!("manifest batch missing '{name}' column"))
323 })?
324 .as_any()
325 .downcast_ref::<UInt64Array>()
326 .ok_or_else(|| {
327 OmniError::manifest_internal(format!("manifest column '{name}' is not UInt64"))
328 })
329}
330
331fn required_u64(column: &UInt64Array, row: usize, name: &str) -> Result<u64> {
332 if column.is_null(row) {
333 return Err(OmniError::manifest_internal(format!(
334 "manifest column '{name}' is null at row {row}"
335 )));
336 }
337 Ok(column.value(row))
338}