Skip to main content

uni_store/backend/
traits.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright 2024-2026 Dragonscale Team
3
4//! Core [`StorageBackend`] trait definition.
5
6use std::pin::Pin;
7use std::sync::Arc;
8
9use anyhow::Result;
10use arrow_array::RecordBatch;
11use arrow_schema::Schema as ArrowSchema;
12use async_trait::async_trait;
13use futures::Stream;
14
15use super::types::*;
16
17/// A record batch stream returned by [`StorageBackend::scan_stream`].
18pub type RecordBatchStream = Pin<Box<dyn Stream<Item = Result<RecordBatch>> + Send>>;
19
20/// RAII guard serializing writes to a single table, from
21/// [`StorageBackend::lock_table_for_write`].
22///
23/// Held across a multi-step read-modify-write (e.g. an index backfill's
24/// scan → transform → [`StorageBackend::replace_table_atomic`]) so a concurrent
25/// [`StorageBackend::write`] append cannot interleave between the read and the
26/// overwrite and be silently discarded. A no-op for backends without per-table
27/// write locking.
28#[must_use = "the table write lock is released as soon as the guard is dropped"]
29pub struct TableWriteGuard(
30    // Held purely for its `Drop` side effect (releasing the per-table mutex).
31    #[expect(dead_code, reason = "guard is held only to release the lock on drop")]
32    Option<tokio::sync::OwnedMutexGuard<()>>,
33);
34
35impl std::fmt::Debug for TableWriteGuard {
36    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
37        f.debug_struct("TableWriteGuard").finish_non_exhaustive()
38    }
39}
40
41impl TableWriteGuard {
42    /// A no-op guard for backends that do not serialize writes per table.
43    pub fn none() -> Self {
44        Self(None)
45    }
46
47    /// Wrap an owned per-table mutex guard.
48    pub fn held(guard: tokio::sync::OwnedMutexGuard<()>) -> Self {
49        Self(Some(guard))
50    }
51}
52
53/// Core storage backend trait.
54///
55/// All persistent storage operations go through this trait. Backends must be
56/// thread-safe ([`Send`] + [`Sync`]) and have a static lifetime for use with
57/// `Arc<dyn StorageBackend>`.
58///
59/// # Design Principles
60///
61/// - **Arrow-native**: All data interchange uses Arrow [`RecordBatch`].
62/// - **SQL-string filters**: Filter expressions use SQL-like strings initially.
63///   Backends that don't support SQL must parse/translate these strings.
64/// - **Capabilities via default methods**: Optional features (vector search, FTS)
65///   have default implementations that return "not supported" errors.
66/// - **Table-level operations**: The backend manages individual tables (not the
67///   higher-level graph schema). Table naming conventions are in [`super::table_names`].
68#[async_trait]
69pub trait StorageBackend: Send + Sync + 'static {
70    // ========================
71    // Table Lifecycle
72    // ========================
73
74    /// List all table names in the backend.
75    async fn table_names(&self) -> Result<Vec<String>>;
76
77    /// Check if a table exists.
78    async fn table_exists(&self, name: &str) -> Result<bool>;
79
80    /// Create a new table with initial data batches.
81    async fn create_table(&self, name: &str, batches: Vec<RecordBatch>) -> Result<()>;
82
83    /// Create a new empty table with the given schema.
84    async fn create_empty_table(&self, name: &str, schema: Arc<ArrowSchema>) -> Result<()>;
85
86    /// Open a table if it exists, or create it with the given schema.
87    async fn open_or_create_table(&self, name: &str, schema: Arc<ArrowSchema>) -> Result<()>;
88
89    /// Drop a table by name.
90    async fn drop_table(&self, name: &str) -> Result<()>;
91
92    /// Notify the backend that a table now exists, even though no
93    /// `create_table` / `create_empty_table` / `open_or_create_table`
94    /// went through this trait. The default implementation is a
95    /// no-op; backends that cache existence (e.g. `LanceDbBackend`'s
96    /// `existence_cache` from issue #55) override to invalidate the
97    /// stale negative entry. Used by `BranchedBackend` after it
98    /// creates a fork-side dataset directly through the Lance branch
99    /// primitives — that path does not call `create_table` on the
100    /// inner backend, so without this hook the inner backend's
101    /// existence cache silently keeps reporting `false` for the
102    /// just-created table.
103    async fn notify_table_created(&self, name: &str) {
104        let _ = name;
105    }
106
107    // ========================
108    // Read Operations
109    // ========================
110
111    /// Scan a table, collecting all matching rows into batches.
112    async fn scan(&self, request: ScanRequest) -> Result<Vec<RecordBatch>>;
113
114    /// Scan a table, returning a streaming iterator over record batches.
115    async fn scan_stream(&self, request: ScanRequest) -> Result<RecordBatchStream>;
116
117    /// Get the Arrow schema for a table. Returns `None` if the table doesn't exist.
118    async fn get_table_schema(&self, name: &str) -> Result<Option<Arc<ArrowSchema>>>;
119
120    /// Count rows in a table, optionally with a filter.
121    async fn count_rows(&self, table_name: &str, filter: Option<&str>) -> Result<usize>;
122
123    // ========================
124    // Write Operations
125    // ========================
126
127    /// Write record batches to a table.
128    async fn write(
129        &self,
130        table_name: &str,
131        batches: Vec<RecordBatch>,
132        mode: WriteMode,
133    ) -> Result<()>;
134
135    /// Upsert via Lance MergeInsert. Source rows are joined to the
136    /// target on the columns in `on`; matched rows have `UpdateAll`
137    /// applied (i.e. every column present in the source overrides the
138    /// target's value for that column; columns not in the source are
139    /// preserved). Unmatched source rows are DROPPED — partial writes
140    /// never INSERT (CREATE goes through `write` with `WriteMode::Append`).
141    ///
142    /// Used by `Writer::flush_stream_l1` when
143    /// `UniConfig::partial_lance_writes` is on.
144    async fn merge_insert(
145        &self,
146        _table_name: &str,
147        _on: &[&str],
148        _batches: Vec<RecordBatch>,
149    ) -> Result<()> {
150        anyhow::bail!("merge_insert not supported by this backend")
151    }
152
153    /// Delete rows matching a filter expression.
154    async fn delete_rows(&self, table_name: &str, filter: &str) -> Result<()>;
155
156    /// Atomically replace a table's contents.
157    ///
158    /// Handles the case where batches may be empty (clears the table) and the
159    /// table may not exist yet (creates it).
160    async fn replace_table_atomic(
161        &self,
162        name: &str,
163        batches: Vec<RecordBatch>,
164        schema: Arc<ArrowSchema>,
165    ) -> Result<()>;
166
167    /// Acquire the per-table write lock, returning a guard held until dropped.
168    ///
169    /// A caller performing a read-modify-write that spans multiple backend calls —
170    /// a scan followed by [`Self::replace_table_atomic`], as the MUVERA FDE backfill
171    /// does — must hold this across the whole sequence. Otherwise a concurrent
172    /// [`Self::write`] append can land between the read and the full-table overwrite
173    /// and be silently lost. [`Self::write`] / [`Self::merge_insert`] take the same
174    /// lock internally, so holding it here makes them mutually exclusive.
175    ///
176    /// Backends without per-table write locking return a no-op guard.
177    async fn lock_table_for_write(&self, name: &str) -> TableWriteGuard {
178        let _ = name;
179        TableWriteGuard::none()
180    }
181
182    // ========================
183    // Versioning / MVCC
184    // ========================
185
186    /// Get the current version of a table. Returns `None` if the table doesn't exist.
187    async fn get_table_version(&self, table_name: &str) -> Result<Option<u64>>;
188
189    /// Roll back a table to a specific version.
190    async fn rollback_table(&self, table_name: &str, target_version: u64) -> Result<()>;
191
192    // ========================
193    // Maintenance
194    // ========================
195
196    /// Optimize a table (compaction, cleanup, etc.).
197    async fn optimize_table(&self, table_name: &str) -> Result<()>;
198
199    /// Recover a table from crash state (incomplete staging writes, etc.).
200    async fn recover_staging(&self, table_name: &str) -> Result<()>;
201
202    // ========================
203    // Cache Management
204    // ========================
205
206    /// Invalidate any cached state for a table.
207    fn invalidate_cache(&self, _table_name: &str) {}
208
209    /// Clear all cached state.
210    fn clear_cache(&self) {}
211
212    // ========================
213    // Metadata
214    // ========================
215
216    /// Get the base URI for this backend's storage location.
217    fn base_uri(&self) -> &str;
218
219    // ========================
220    // Capability Checks
221    // ========================
222
223    /// Whether this backend supports vector similarity search.
224    fn supports_vector_search(&self) -> bool {
225        false
226    }
227
228    /// Whether this backend supports full-text search.
229    fn supports_full_text_search(&self) -> bool {
230        false
231    }
232
233    /// Whether this backend supports scalar indexes.
234    fn supports_scalar_index(&self) -> bool {
235        false
236    }
237
238    // ========================
239    // Optional Capabilities
240    // ========================
241
242    /// Perform a vector similarity search.
243    #[expect(clippy::too_many_arguments)]
244    async fn vector_search(
245        &self,
246        _table: &str,
247        _column: &str,
248        _query: &[f32],
249        _k: usize,
250        _metric: DistanceMetric,
251        _filter: FilterExpr,
252        _opts: VectorQueryOpts,
253    ) -> Result<Vec<RecordBatch>> {
254        anyhow::bail!("Vector search not supported by this backend")
255    }
256
257    /// Late-interaction (ColBERT / MaxSim) search over a multi-vector column.
258    ///
259    /// `query` is a set of per-token vectors; each row's `List<FixedSizeList>`
260    /// column is scored by MaxSim. Defaults to unsupported.
261    #[expect(clippy::too_many_arguments)]
262    async fn multivector_search(
263        &self,
264        _table: &str,
265        _column: &str,
266        _query: &[Vec<f32>],
267        _k: usize,
268        _metric: DistanceMetric,
269        _filter: FilterExpr,
270        _opts: VectorQueryOpts,
271    ) -> Result<Vec<RecordBatch>> {
272        anyhow::bail!("Multi-vector search not supported by this backend")
273    }
274
275    /// Perform a full-text search.
276    async fn full_text_search(
277        &self,
278        _table: &str,
279        _column: &str,
280        _query: &str,
281        _k: usize,
282        _filter: FilterExpr,
283    ) -> Result<Vec<RecordBatch>> {
284        anyhow::bail!("Full-text search not supported by this backend")
285    }
286
287    /// Create a named vector (ANN) index on a column with the given parameters.
288    ///
289    /// `name` is the index name to assign; an existing index of the same name is
290    /// replaced. `params` selects the physical index shape and metric.
291    ///
292    /// # Errors
293    /// Returns an error if the backend does not support vector indexing or the
294    /// build fails.
295    async fn create_vector_index(
296        &self,
297        _table: &str,
298        _column: &str,
299        _name: &str,
300        _params: VectorIndexParams,
301    ) -> Result<()> {
302        anyhow::bail!("Vector indexing not supported by this backend")
303    }
304
305    /// Create a full-text search index over one or more columns.
306    ///
307    /// `name` is the index name (`None` lets the backend choose a default).
308    /// `with_positions` enables phrase/position postings.
309    ///
310    /// # Errors
311    /// Returns an error if the backend does not support FTS or the build fails.
312    async fn create_fts_index(
313        &self,
314        _table: &str,
315        _columns: &[&str],
316        _name: Option<&str>,
317        _with_positions: bool,
318    ) -> Result<()> {
319        anyhow::bail!("FTS indexing not supported by this backend")
320    }
321
322    /// Create a scalar index over one or more columns.
323    ///
324    /// `name` is the index name (`None` lets the backend choose a default).
325    ///
326    /// # Errors
327    /// Returns an error if the backend does not support scalar indexing or the
328    /// build fails.
329    async fn create_scalar_index(
330        &self,
331        _table: &str,
332        _columns: &[&str],
333        _index_type: ScalarIndexType,
334        _name: Option<&str>,
335    ) -> Result<()> {
336        anyhow::bail!("Scalar indexing not supported by this backend")
337    }
338
339    /// Drop an index by name.
340    async fn drop_index(&self, _table: &str, _index_name: &str) -> Result<()> {
341        anyhow::bail!("Index drop not supported by this backend")
342    }
343
344    /// List all indexes on a table.
345    async fn list_indexes(&self, _table: &str) -> Result<Vec<IndexInfo>> {
346        Ok(vec![])
347    }
348}