uni_plugin/traits/storage.rs
1//! Storage backend plugins — `MATCH` / `CREATE` against pluggable stores.
2//!
3//! M5a (2026-05-24): traits are `#[async_trait]`. Real backends such as the
4//! `lance://` adapter in `uni-plugin-builtin` need to drive async I/O
5//! through `uni-store`; the plugin surface mirrors that shape so adapters
6//! don't have to fabricate a blocking runtime per call.
7
8use std::sync::Arc;
9
10use arrow_schema::SchemaRef;
11use async_trait::async_trait;
12use datafusion::arrow::record_batch::RecordBatch;
13use datafusion::execution::SendableRecordBatchStream;
14use datafusion::logical_expr::Expr;
15
16use crate::errors::FnError;
17
18/// Options passed at backend open time.
19#[derive(Clone, Debug, Default)]
20pub struct StorageOptions {
21 /// Free-form JSON configuration.
22 pub config_json: String,
23}
24
25/// Opaque write handle returned by [`Storage::write_batch`].
26#[derive(Clone, Debug)]
27pub struct WriteHandle {
28 /// Backend-specific identifier (LSN, transaction id, …).
29 pub id: u64,
30}
31
32/// Metadata returned by [`Storage::fork`] describing the newly-created branch.
33///
34/// `parent_version` is the backend version pinned as the fork-point, so
35/// callers orchestrating nested forks can chain `create_branch_from`-style
36/// calls without re-querying the backend. `branch_name` echoes the
37/// `dst_branch` argument, surfaced explicitly so backends with name
38/// canonicalization can return the resolved form.
39#[derive(Clone, Debug)]
40pub struct BranchMetadata {
41 /// Backend version pinned as the new branch's fork-point.
42 pub parent_version: u64,
43 /// Branch identifier as registered on the backend.
44 pub branch_name: String,
45}
46
47/// A storage backend identified by URI scheme.
48#[async_trait]
49pub trait StorageBackend: Send + Sync {
50 /// URI scheme this backend handles (`"lance"`, `"s3"`, `"memory"`).
51 fn scheme(&self) -> &'static str;
52
53 /// Open the backend at `uri`.
54 ///
55 /// # Errors
56 ///
57 /// Returns [`FnError`] if the URI is malformed or the backend cannot
58 /// be opened (auth failure, network error).
59 async fn open(&self, uri: &str, options: &StorageOptions) -> Result<Arc<dyn Storage>, FnError>;
60}
61
62/// Per-instance storage interface.
63#[async_trait]
64pub trait Storage: Send + Sync {
65 /// Stream batches from `table` matching `predicate`.
66 ///
67 /// `predicate = None` means a full scan.
68 ///
69 /// # Errors
70 ///
71 /// Returns [`FnError`] if the read cannot start.
72 async fn read_batch(
73 &self,
74 table: &str,
75 predicate: Option<&Expr>,
76 ) -> Result<SendableRecordBatchStream, FnError>;
77
78 /// Write a single batch to `table`.
79 ///
80 /// # Errors
81 ///
82 /// Returns [`FnError`] on write failure.
83 async fn write_batch(&self, table: &str, batch: &RecordBatch) -> Result<WriteHandle, FnError>;
84
85 /// List tables known to this backend.
86 ///
87 /// # Errors
88 ///
89 /// Returns [`FnError`] if the listing cannot complete.
90 async fn list_tables(&self) -> Result<Vec<String>, FnError>;
91
92 /// Delete rows in `table` matching `predicate`. Returns the number of
93 /// rows actually deleted.
94 ///
95 /// # Errors
96 ///
97 /// Returns [`FnError`] on delete failure.
98 async fn delete(&self, table: &str, predicate: &Expr) -> Result<u64, FnError>;
99
100 /// Whether this backend supports branched / forked state.
101 fn supports_branching(&self) -> bool {
102 false
103 }
104
105 /// Fork `src_branch` of `table` into `dst_branch`. Default: unsupported.
106 ///
107 /// Granularity is per-dataset (`table`) because real branching backends
108 /// (Lance) track branches and versions independently per dataset.
109 /// Multi-dataset orchestration (atomic across all tables of a logical
110 /// fork) is the caller's responsibility.
111 ///
112 /// # Errors
113 ///
114 /// Returns [`FnError`] if branching is not supported or the fork
115 /// operation fails (missing source branch, name collision, I/O).
116 async fn fork(
117 &self,
118 _table: &str,
119 _src_branch: &str,
120 _dst_branch: &str,
121 ) -> Result<BranchMetadata, FnError> {
122 Err(FnError::new(
123 0x10,
124 "storage backend does not support branching",
125 ))
126 }
127
128 /// Backend-declared schema for `table`, if known.
129 async fn schema(&self, _table: &str) -> Option<SchemaRef> {
130 None
131 }
132}