uni_store/backend/traits.rs
1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Core [`StorageBackend`] trait definition.
5
6use std::pin::Pin;
7use std::sync::Arc;
8
9use anyhow::Result;
10use arrow_array::RecordBatch;
11use arrow_schema::Schema as ArrowSchema;
12use async_trait::async_trait;
13use futures::Stream;
14
15use super::types::*;
16
17/// A record batch stream returned by [`StorageBackend::scan_stream`].
18pub type RecordBatchStream = Pin<Box<dyn Stream<Item = Result<RecordBatch>> + Send>>;
19
20/// RAII guard serializing writes to a single table, from
21/// [`StorageBackend::lock_table_for_write`].
22///
23/// Held across a multi-step read-modify-write (e.g. an index backfill's
24/// scan → transform → [`StorageBackend::replace_table_atomic`]) so a concurrent
25/// [`StorageBackend::write`] append cannot interleave between the read and the
26/// overwrite and be silently discarded. A no-op for backends without per-table
27/// write locking.
28#[must_use = "the table write lock is released as soon as the guard is dropped"]
29pub struct TableWriteGuard(
30 // Held purely for its `Drop` side effect (releasing the per-table mutex).
31 #[expect(dead_code, reason = "guard is held only to release the lock on drop")]
32 Option<tokio::sync::OwnedMutexGuard<()>>,
33);
34
35impl std::fmt::Debug for TableWriteGuard {
36 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37 f.debug_struct("TableWriteGuard").finish_non_exhaustive()
38 }
39}
40
41impl TableWriteGuard {
42 /// A no-op guard for backends that do not serialize writes per table.
43 pub fn none() -> Self {
44 Self(None)
45 }
46
47 /// Wrap an owned per-table mutex guard.
48 pub fn held(guard: tokio::sync::OwnedMutexGuard<()>) -> Self {
49 Self(Some(guard))
50 }
51}
52
53/// Core storage backend trait.
54///
55/// All persistent storage operations go through this trait. Backends must be
56/// thread-safe ([`Send`] + [`Sync`]) and have a static lifetime for use with
57/// `Arc<dyn StorageBackend>`.
58///
59/// # Design Principles
60///
61/// - **Arrow-native**: All data interchange uses Arrow [`RecordBatch`].
62/// - **SQL-string filters**: Filter expressions use SQL-like strings initially.
63/// Backends that don't support SQL must parse/translate these strings.
64/// - **Capabilities via default methods**: Optional features (vector search, FTS)
65/// have default implementations that return "not supported" errors.
66/// - **Table-level operations**: The backend manages individual tables (not the
67/// higher-level graph schema). Table naming conventions are in [`super::table_names`].
68#[async_trait]
69pub trait StorageBackend: Send + Sync + 'static {
70 // ========================
71 // Table Lifecycle
72 // ========================
73
74 /// List all table names in the backend.
75 async fn table_names(&self) -> Result<Vec<String>>;
76
77 /// Check if a table exists.
78 async fn table_exists(&self, name: &str) -> Result<bool>;
79
80 /// Create a new table with initial data batches.
81 async fn create_table(&self, name: &str, batches: Vec<RecordBatch>) -> Result<()>;
82
83 /// Create a new empty table with the given schema.
84 async fn create_empty_table(&self, name: &str, schema: Arc<ArrowSchema>) -> Result<()>;
85
86 /// Open a table if it exists, or create it with the given schema.
87 async fn open_or_create_table(&self, name: &str, schema: Arc<ArrowSchema>) -> Result<()>;
88
89 /// Drop a table by name.
90 async fn drop_table(&self, name: &str) -> Result<()>;
91
92 /// Notify the backend that a table now exists, even though no
93 /// `create_table` / `create_empty_table` / `open_or_create_table`
94 /// went through this trait. The default implementation is a
95 /// no-op; backends that cache existence (e.g. `LanceDbBackend`'s
96 /// `existence_cache` from issue #55) override to invalidate the
97 /// stale negative entry. Used by `BranchedBackend` after it
98 /// creates a fork-side dataset directly through the Lance branch
99 /// primitives — that path does not call `create_table` on the
100 /// inner backend, so without this hook the inner backend's
101 /// existence cache silently keeps reporting `false` for the
102 /// just-created table.
103 async fn notify_table_created(&self, name: &str) {
104 let _ = name;
105 }
106
107 // ========================
108 // Read Operations
109 // ========================
110
111 /// Scan a table, collecting all matching rows into batches.
112 async fn scan(&self, request: ScanRequest) -> Result<Vec<RecordBatch>>;
113
114 /// Scan a table, returning a streaming iterator over record batches.
115 async fn scan_stream(&self, request: ScanRequest) -> Result<RecordBatchStream>;
116
117 /// Get the Arrow schema for a table. Returns `None` if the table doesn't exist.
118 async fn get_table_schema(&self, name: &str) -> Result<Option<Arc<ArrowSchema>>>;
119
120 /// Count rows in a table, optionally with a filter.
121 async fn count_rows(&self, table_name: &str, filter: Option<&str>) -> Result<usize>;
122
123 // ========================
124 // Write Operations
125 // ========================
126
127 /// Write record batches to a table.
128 async fn write(
129 &self,
130 table_name: &str,
131 batches: Vec<RecordBatch>,
132 mode: WriteMode,
133 ) -> Result<()>;
134
135 /// Upsert via Lance MergeInsert. Source rows are joined to the
136 /// target on the columns in `on`; matched rows have `UpdateAll`
137 /// applied (i.e. every column present in the source overrides the
138 /// target's value for that column; columns not in the source are
139 /// preserved). Unmatched source rows are DROPPED — partial writes
140 /// never INSERT (CREATE goes through `write` with `WriteMode::Append`).
141 ///
142 /// Used by `Writer::flush_stream_l1` when
143 /// `UniConfig::partial_lance_writes` is on.
144 async fn merge_insert(
145 &self,
146 _table_name: &str,
147 _on: &[&str],
148 _batches: Vec<RecordBatch>,
149 ) -> Result<()> {
150 anyhow::bail!("merge_insert not supported by this backend")
151 }
152
153 /// Delete rows matching a filter expression.
154 async fn delete_rows(&self, table_name: &str, filter: &str) -> Result<()>;
155
156 /// Atomically replace a table's contents.
157 ///
158 /// Handles the case where batches may be empty (clears the table) and the
159 /// table may not exist yet (creates it).
160 async fn replace_table_atomic(
161 &self,
162 name: &str,
163 batches: Vec<RecordBatch>,
164 schema: Arc<ArrowSchema>,
165 ) -> Result<()>;
166
167 /// Acquire the per-table write lock, returning a guard held until dropped.
168 ///
169 /// A caller performing a read-modify-write that spans multiple backend calls —
170 /// a scan followed by [`Self::replace_table_atomic`], as the MUVERA FDE backfill
171 /// does — must hold this across the whole sequence. Otherwise a concurrent
172 /// [`Self::write`] append can land between the read and the full-table overwrite
173 /// and be silently lost. [`Self::write`] / [`Self::merge_insert`] take the same
174 /// lock internally, so holding it here makes them mutually exclusive.
175 ///
176 /// Backends without per-table write locking return a no-op guard.
177 async fn lock_table_for_write(&self, name: &str) -> TableWriteGuard {
178 let _ = name;
179 TableWriteGuard::none()
180 }
181
182 // ========================
183 // Versioning / MVCC
184 // ========================
185
186 /// Get the current version of a table. Returns `None` if the table doesn't exist.
187 async fn get_table_version(&self, table_name: &str) -> Result<Option<u64>>;
188
189 /// Roll back a table to a specific version.
190 async fn rollback_table(&self, table_name: &str, target_version: u64) -> Result<()>;
191
192 // ========================
193 // Maintenance
194 // ========================
195
196 /// Optimize a table (compaction, cleanup, etc.).
197 async fn optimize_table(&self, table_name: &str) -> Result<()>;
198
199 /// Recover a table from crash state (incomplete staging writes, etc.).
200 async fn recover_staging(&self, table_name: &str) -> Result<()>;
201
202 // ========================
203 // Cache Management
204 // ========================
205
206 /// Invalidate any cached state for a table.
207 fn invalidate_cache(&self, _table_name: &str) {}
208
209 /// Clear all cached state.
210 fn clear_cache(&self) {}
211
212 // ========================
213 // Metadata
214 // ========================
215
216 /// Get the base URI for this backend's storage location.
217 fn base_uri(&self) -> &str;
218
219 // ========================
220 // Capability Checks
221 // ========================
222
223 /// Whether this backend supports vector similarity search.
224 fn supports_vector_search(&self) -> bool {
225 false
226 }
227
228 /// Whether this backend supports full-text search.
229 fn supports_full_text_search(&self) -> bool {
230 false
231 }
232
233 /// Whether this backend supports scalar indexes.
234 fn supports_scalar_index(&self) -> bool {
235 false
236 }
237
238 // ========================
239 // Optional Capabilities
240 // ========================
241
242 /// Perform a vector similarity search.
243 #[expect(clippy::too_many_arguments)]
244 async fn vector_search(
245 &self,
246 _table: &str,
247 _column: &str,
248 _query: &[f32],
249 _k: usize,
250 _metric: DistanceMetric,
251 _filter: FilterExpr,
252 _opts: VectorQueryOpts,
253 ) -> Result<Vec<RecordBatch>> {
254 anyhow::bail!("Vector search not supported by this backend")
255 }
256
257 /// Late-interaction (ColBERT / MaxSim) search over a multi-vector column.
258 ///
259 /// `query` is a set of per-token vectors; each row's `List<FixedSizeList>`
260 /// column is scored by MaxSim. Defaults to unsupported.
261 #[expect(clippy::too_many_arguments)]
262 async fn multivector_search(
263 &self,
264 _table: &str,
265 _column: &str,
266 _query: &[Vec<f32>],
267 _k: usize,
268 _metric: DistanceMetric,
269 _filter: FilterExpr,
270 _opts: VectorQueryOpts,
271 ) -> Result<Vec<RecordBatch>> {
272 anyhow::bail!("Multi-vector search not supported by this backend")
273 }
274
275 /// Perform a full-text search.
276 async fn full_text_search(
277 &self,
278 _table: &str,
279 _column: &str,
280 _query: &str,
281 _k: usize,
282 _filter: FilterExpr,
283 ) -> Result<Vec<RecordBatch>> {
284 anyhow::bail!("Full-text search not supported by this backend")
285 }
286
287 /// Create a named vector (ANN) index on a column with the given parameters.
288 ///
289 /// `name` is the index name to assign; an existing index of the same name is
290 /// replaced. `params` selects the physical index shape and metric.
291 ///
292 /// # Errors
293 /// Returns an error if the backend does not support vector indexing or the
294 /// build fails.
295 async fn create_vector_index(
296 &self,
297 _table: &str,
298 _column: &str,
299 _name: &str,
300 _params: VectorIndexParams,
301 ) -> Result<()> {
302 anyhow::bail!("Vector indexing not supported by this backend")
303 }
304
305 /// Create a full-text search index over one or more columns.
306 ///
307 /// `name` is the index name (`None` lets the backend choose a default).
308 /// `with_positions` enables phrase/position postings.
309 ///
310 /// # Errors
311 /// Returns an error if the backend does not support FTS or the build fails.
312 async fn create_fts_index(
313 &self,
314 _table: &str,
315 _columns: &[&str],
316 _name: Option<&str>,
317 _with_positions: bool,
318 ) -> Result<()> {
319 anyhow::bail!("FTS indexing not supported by this backend")
320 }
321
322 /// Create a scalar index over one or more columns.
323 ///
324 /// `name` is the index name (`None` lets the backend choose a default).
325 ///
326 /// # Errors
327 /// Returns an error if the backend does not support scalar indexing or the
328 /// build fails.
329 async fn create_scalar_index(
330 &self,
331 _table: &str,
332 _columns: &[&str],
333 _index_type: ScalarIndexType,
334 _name: Option<&str>,
335 ) -> Result<()> {
336 anyhow::bail!("Scalar indexing not supported by this backend")
337 }
338
339 /// Drop an index by name.
340 async fn drop_index(&self, _table: &str, _index_name: &str) -> Result<()> {
341 anyhow::bail!("Index drop not supported by this backend")
342 }
343
344 /// List all indexes on a table.
345 async fn list_indexes(&self, _table: &str) -> Result<Vec<IndexInfo>> {
346 Ok(vec![])
347 }
348}