Skip to main content

uni_plugin/traits/
storage.rs

1//! Storage backend plugins — `MATCH` / `CREATE` against pluggable stores.
2//!
3//! M5a (2026-05-24): traits are `#[async_trait]`. Real backends such as the
4//! `lance://` adapter in `uni-plugin-builtin` need to drive async I/O
5//! through `uni-store`; the plugin surface mirrors that shape so adapters
6//! don't have to fabricate a blocking runtime per call.
7
8use std::sync::Arc;
9
10use arrow_schema::SchemaRef;
11use async_trait::async_trait;
12use datafusion::arrow::record_batch::RecordBatch;
13use datafusion::execution::SendableRecordBatchStream;
14use datafusion::logical_expr::Expr;
15
16use crate::errors::FnError;
17
18/// Options passed at backend open time.
19#[derive(Clone, Debug, Default)]
20pub struct StorageOptions {
21    /// Free-form JSON configuration.
22    pub config_json: String,
23}
24
25/// Opaque write handle returned by [`Storage::write_batch`].
26#[derive(Clone, Debug)]
27pub struct WriteHandle {
28    /// Backend-specific identifier (LSN, transaction id, …).
29    pub id: u64,
30}
31
32/// Metadata returned by [`Storage::fork`] describing the newly-created branch.
33///
34/// `parent_version` is the backend version pinned as the fork-point, so
35/// callers orchestrating nested forks can chain `create_branch_from`-style
36/// calls without re-querying the backend. `branch_name` echoes the
37/// `dst_branch` argument, surfaced explicitly so backends with name
38/// canonicalization can return the resolved form.
39#[derive(Clone, Debug)]
40pub struct BranchMetadata {
41    /// Backend version pinned as the new branch's fork-point.
42    pub parent_version: u64,
43    /// Branch identifier as registered on the backend.
44    pub branch_name: String,
45}
46
47/// A storage backend identified by URI scheme.
48#[async_trait]
49pub trait StorageBackend: Send + Sync {
50    /// URI scheme this backend handles (`"lance"`, `"s3"`, `"memory"`).
51    fn scheme(&self) -> &'static str;
52
53    /// Open the backend at `uri`.
54    ///
55    /// # Errors
56    ///
57    /// Returns [`FnError`] if the URI is malformed or the backend cannot
58    /// be opened (auth failure, network error).
59    async fn open(&self, uri: &str, options: &StorageOptions) -> Result<Arc<dyn Storage>, FnError>;
60}
61
62/// Per-instance storage interface.
63#[async_trait]
64pub trait Storage: Send + Sync {
65    /// Stream batches from `table` matching `predicate`.
66    ///
67    /// `predicate = None` means a full scan.
68    ///
69    /// # Errors
70    ///
71    /// Returns [`FnError`] if the read cannot start.
72    async fn read_batch(
73        &self,
74        table: &str,
75        predicate: Option<&Expr>,
76    ) -> Result<SendableRecordBatchStream, FnError>;
77
78    /// Write a single batch to `table`.
79    ///
80    /// # Errors
81    ///
82    /// Returns [`FnError`] on write failure.
83    async fn write_batch(&self, table: &str, batch: &RecordBatch) -> Result<WriteHandle, FnError>;
84
85    /// List tables known to this backend.
86    ///
87    /// # Errors
88    ///
89    /// Returns [`FnError`] if the listing cannot complete.
90    async fn list_tables(&self) -> Result<Vec<String>, FnError>;
91
92    /// Delete rows in `table` matching `predicate`. Returns the number of
93    /// rows actually deleted.
94    ///
95    /// # Errors
96    ///
97    /// Returns [`FnError`] on delete failure.
98    async fn delete(&self, table: &str, predicate: &Expr) -> Result<u64, FnError>;
99
100    /// Whether this backend supports branched / forked state.
101    fn supports_branching(&self) -> bool {
102        false
103    }
104
105    /// Fork `src_branch` of `table` into `dst_branch`. Default: unsupported.
106    ///
107    /// Granularity is per-dataset (`table`) because real branching backends
108    /// (Lance) track branches and versions independently per dataset.
109    /// Multi-dataset orchestration (atomic across all tables of a logical
110    /// fork) is the caller's responsibility.
111    ///
112    /// # Errors
113    ///
114    /// Returns [`FnError`] if branching is not supported or the fork
115    /// operation fails (missing source branch, name collision, I/O).
116    async fn fork(
117        &self,
118        _table: &str,
119        _src_branch: &str,
120        _dst_branch: &str,
121    ) -> Result<BranchMetadata, FnError> {
122        Err(FnError::new(
123            0x10,
124            "storage backend does not support branching",
125        ))
126    }
127
128    /// Backend-declared schema for `table`, if known.
129    async fn schema(&self, _table: &str) -> Option<SchemaRef> {
130        None
131    }
132}