Skip to main content

omnia_sdk/
capabilities.rs

1//! # Provider
2//!
3//! Provider defines external data interfaces for the crate.
4
5use std::any::Any;
6use std::collections::HashMap;
7use std::error::Error;
8use std::future::Future;
9
10use anyhow::Result;
11#[cfg(target_arch = "wasm32")]
12use anyhow::{Context, anyhow};
13use bytes::Bytes;
14use http::{Request, Response};
15use http_body::Body;
16use omnia_wasi_sql::{DataType, Row};
17
18/// The `Config` trait is used by implementers to provide configuration from
19/// WASI-guest to dependent crates.
20pub trait Config: Send + Sync {
21    /// Get configuration setting.
22    #[cfg(not(target_arch = "wasm32"))]
23    fn get(&self, key: &str) -> impl Future<Output = Result<String>> + Send;
24
25    /// Get configuration setting.
26    #[cfg(target_arch = "wasm32")]
27    fn get(&self, key: &str) -> impl Future<Output = Result<String>> + Send {
28        async move {
29            let config = omnia_wasi_config::store::get(key).context("getting configuration")?;
30            config.ok_or_else(|| anyhow!("configuration not found"))
31        }
32    }
33}
34
35/// The `HttpRequest` trait defines the behavior for fetching data from a source.
36pub trait HttpRequest: Send + Sync {
37    /// Make outbound HTTP request.
38    #[cfg(not(target_arch = "wasm32"))]
39    fn fetch<T>(&self, request: Request<T>) -> impl Future<Output = Result<Response<Bytes>>> + Send
40    where
41        T: Body + Any + Send,
42        T::Data: Into<Vec<u8>>,
43        T::Error: Into<Box<dyn Error + Send + Sync + 'static>>;
44
45    /// Make outbound HTTP request.
46    #[cfg(target_arch = "wasm32")]
47    fn fetch<T>(&self, request: Request<T>) -> impl Future<Output = Result<Response<Bytes>>> + Send
48    where
49        T: Body + Any + Send,
50        T::Data: Into<Vec<u8>>,
51        T::Error: Into<Box<dyn Error + Send + Sync + 'static>>,
52    {
53        async move { omnia_wasi_http::handle(request).await }
54    }
55}
56
57/// Message represents a message to be published.
58#[derive(Clone, Debug)]
59pub struct Message {
60    /// The message payload.
61    pub payload: Vec<u8>,
62    /// The message headers.
63    pub headers: HashMap<String, String>,
64}
65
66impl Message {
67    /// Create a new message with the specified payload.
68    #[must_use]
69    pub fn new(payload: &[u8]) -> Self {
70        Self {
71            payload: payload.to_vec(),
72            headers: HashMap::new(),
73        }
74    }
75}
76
77/// The `Publisher` trait defines the message publishing behavior.
78pub trait Publish: Send + Sync {
79    /// Publish (send) a message to a topic.
80    #[cfg(not(target_arch = "wasm32"))]
81    fn send(&self, topic: &str, message: &Message) -> impl Future<Output = Result<()>> + Send;
82
83    /// Publish (send) a message to a topic.
84    #[cfg(target_arch = "wasm32")]
85    fn send(&self, topic: &str, message: &Message) -> impl Future<Output = Result<()>> + Send {
86        use omnia_wasi_messaging::producer;
87        use omnia_wasi_messaging::types::{self as wasi, Client};
88
89        async move {
90            let client =
91                Client::connect("host".to_string()).await.context("connecting to broker")?;
92            producer::send(&client, topic.to_string(), wasi::Message::new(&message.payload))
93                .await
94                .with_context(|| format!("sending message to {topic}"))
95        }
96    }
97}
98
99/// The `StateStore` trait defines the behavior storing and retrieving train state.
100pub trait StateStore: Send + Sync {
101    /// Retrieve a previously stored value from the state store.
102    #[cfg(not(target_arch = "wasm32"))]
103    fn get(&self, key: &str) -> impl Future<Output = Result<Option<Vec<u8>>>> + Send;
104
105    /// Store a value in the state store.
106    #[cfg(not(target_arch = "wasm32"))]
107    fn set(
108        &self, key: &str, value: &[u8], ttl_secs: Option<u64>,
109    ) -> impl Future<Output = Result<Option<Vec<u8>>>> + Send;
110
111    /// Delete a value from the state store.
112    #[cfg(not(target_arch = "wasm32"))]
113    fn delete(&self, key: &str) -> impl Future<Output = Result<()>> + Send;
114
115    /// Retrieve a previously stored value from the state store.
116    #[cfg(target_arch = "wasm32")]
117    fn get(&self, key: &str) -> impl Future<Output = Result<Option<Vec<u8>>>> + Send {
118        async move {
119            let bucket =
120                omnia_wasi_keyvalue::cache::open("cache").await.context("opening cache")?;
121            bucket.get(key).await.context("reading state from cache")
122        }
123    }
124
125    /// Store a value in the state store.
126    #[cfg(target_arch = "wasm32")]
127    fn set(
128        &self, key: &str, value: &[u8], ttl_secs: Option<u64>,
129    ) -> impl Future<Output = Result<Option<Vec<u8>>>> + Send {
130        async move {
131            let bucket =
132                omnia_wasi_keyvalue::cache::open("cache").await.context("opening cache")?;
133            bucket.set(key, value, ttl_secs).await.context("reading state from cache")
134        }
135    }
136
137    /// Delete a value from the state store.
138    #[cfg(target_arch = "wasm32")]
139    fn delete(&self, key: &str) -> impl Future<Output = Result<()>> + Send {
140        async move {
141            let bucket =
142                omnia_wasi_keyvalue::cache::open("cache").await.context("opening cache")?;
143            bucket.delete(key).await.context("deleting entry from cache")
144        }
145    }
146}
147
148/// The `Identity` trait defines behaviors for interacting with identity providers.
149pub trait Identity: Send + Sync {
150    /// Get an access token for the specified identity.
151    #[cfg(not(target_arch = "wasm32"))]
152    fn access_token(&self, identity: String) -> impl Future<Output = Result<String>> + Send;
153
154    /// Get an access token for the specified identity.
155    #[cfg(target_arch = "wasm32")]
156    fn access_token(&self, identity: String) -> impl Future<Output = Result<String>> + Send {
157        use omnia_wasi_identity::credentials::get_identity;
158
159        async move {
160            let identity = wit_bindgen::block_on(get_identity(identity))?;
161            let access_token =
162                wit_bindgen::block_on(async move { identity.get_token(vec![]).await })?;
163            Ok(access_token.token)
164        }
165    }
166}
167
168/// Trait for types that provide ORM database access.
169///
170/// Implement this trait to enable ORM operations. Default implementations
171/// use the WASI SQL bindings to execute queries.
172pub trait TableStore: Send + Sync {
173    /// Executes a query and returns the result rows.
174    #[cfg(not(target_arch = "wasm32"))]
175    fn query(
176        &self, cnn_name: String, query: String, params: Vec<DataType>,
177    ) -> impl Future<Output = Result<Vec<Row>>> + Send;
178
179    /// Executes a statement and returns the number of affected rows.
180    #[cfg(not(target_arch = "wasm32"))]
181    fn exec(
182        &self, cnn_name: String, query: String, params: Vec<DataType>,
183    ) -> impl Future<Output = Result<u32>> + Send;
184
185    /// Executes a query and returns the result rows.
186    ///
187    /// # Errors
188    ///
189    /// Returns an error if the connection fails, statement preparation fails, or query execution fails.
190    #[cfg(target_arch = "wasm32")]
191    fn query(
192        &self, cnn_name: String, query: String, params: Vec<DataType>,
193    ) -> impl Future<Output = Result<Vec<Row>>> + Send {
194        use omnia_wasi_sql::types::{Connection, Statement};
195        async move {
196            let cnn = Connection::open(cnn_name)
197                .await
198                .map_err(|e| anyhow!("failed to open connection: {}", e.trace()))?;
199
200            let stmt = Statement::prepare(query, params)
201                .await
202                .map_err(|e| anyhow!("failed to prepare statement: {}", e.trace()))?;
203
204            let res = omnia_wasi_sql::readwrite::query(&cnn, &stmt)
205                .await
206                .map_err(|e| anyhow!("query failed: {}", e.trace()))?;
207
208            Ok(res)
209        }
210    }
211
212    /// Executes a statement and returns the number of affected rows.
213    ///
214    /// # Errors
215    ///
216    /// Returns an error if the connection fails, statement preparation fails, or execution fails.
217    #[cfg(target_arch = "wasm32")]
218    fn exec(
219        &self, cnn_name: String, query: String, params: Vec<DataType>,
220    ) -> impl Future<Output = Result<u32>> + Send {
221        use omnia_wasi_sql::types::{Connection, Statement};
222        async move {
223            let cnn = Connection::open(cnn_name)
224                .await
225                .map_err(|e| anyhow!("failed to open connection: {}", e.trace()))?;
226
227            let stmt = Statement::prepare(query, params)
228                .await
229                .map_err(|e| anyhow!("failed to prepare statement: {}", e.trace()))?;
230
231            let res = omnia_wasi_sql::readwrite::exec(&cnn, &stmt)
232                .await
233                .map_err(|e| anyhow!("exec failed: {}", e.trace()))?;
234
235            Ok(res)
236        }
237    }
238}
239
240/// JSON document storage (WASI JSON DB).
241///
242/// Default WASM implementations delegate to `wasi:jsondb` via `omnia-wasi-jsondb`.
243pub trait DocumentStore: Send + Sync {
244    /// Fetch a document by id.
245    #[cfg(not(target_arch = "wasm32"))]
246    fn get(
247        &self, store: &str, id: &str,
248    ) -> impl Future<Output = Result<Option<crate::document_store::Document>>> + Send;
249
250    /// Insert a new document (fails if the id already exists).
251    #[cfg(not(target_arch = "wasm32"))]
252    fn insert(
253        &self, store: &str, doc: &crate::document_store::Document,
254    ) -> impl Future<Output = Result<()>> + Send;
255
256    /// Upsert a document by id.
257    #[cfg(not(target_arch = "wasm32"))]
258    fn put(
259        &self, store: &str, doc: &crate::document_store::Document,
260    ) -> impl Future<Output = Result<()>> + Send;
261
262    /// Delete a document by id. Returns whether a document was removed.
263    #[cfg(not(target_arch = "wasm32"))]
264    fn delete(&self, store: &str, id: &str) -> impl Future<Output = Result<bool>> + Send;
265
266    /// Query documents in a collection.
267    #[cfg(not(target_arch = "wasm32"))]
268    fn query(
269        &self, store: &str, options: crate::document_store::QueryOptions,
270    ) -> impl Future<Output = Result<crate::document_store::QueryResult>> + Send;
271
272    /// Fetch a document by id.
273    #[cfg(target_arch = "wasm32")]
274    fn get(
275        &self, store: &str, id: &str,
276    ) -> impl Future<Output = Result<Option<crate::document_store::Document>>> + Send {
277        async move { omnia_wasi_jsondb::store::get(store, id).await }
278    }
279
280    /// Insert a new document (fails if the id already exists).
281    #[cfg(target_arch = "wasm32")]
282    fn insert(
283        &self, store: &str, doc: &crate::document_store::Document,
284    ) -> impl Future<Output = Result<()>> + Send {
285        async move { omnia_wasi_jsondb::store::insert(store, doc).await }
286    }
287
288    /// Upsert a document by id.
289    #[cfg(target_arch = "wasm32")]
290    fn put(
291        &self, store: &str, doc: &crate::document_store::Document,
292    ) -> impl Future<Output = Result<()>> + Send {
293        async move { omnia_wasi_jsondb::store::put(store, doc).await }
294    }
295
296    /// Delete a document by id. Returns whether a document was removed.
297    #[cfg(target_arch = "wasm32")]
298    fn delete(&self, store: &str, id: &str) -> impl Future<Output = Result<bool>> + Send {
299        async move { omnia_wasi_jsondb::store::delete(store, id).await }
300    }
301
302    /// Query documents in a collection.
303    #[cfg(target_arch = "wasm32")]
304    fn query(
305        &self, store: &str, options: crate::document_store::QueryOptions,
306    ) -> impl Future<Output = Result<crate::document_store::QueryResult>> + Send {
307        async move { omnia_wasi_jsondb::store::query(store, options).await }
308    }
309}
310
311/// The `Broadcast` trait defines behavior for sending events to WebSocket
312/// or other broadcast channels.
313pub trait Broadcast: Send + Sync {
314    /// Send an event to connected WebSocket clients.
315    #[cfg(not(target_arch = "wasm32"))]
316    fn send(
317        &self, name: &str, data: &[u8], sockets: Option<Vec<String>>,
318    ) -> impl Future<Output = Result<()>> + Send;
319
320    /// Send an event to connected WebSocket clients.
321    #[cfg(target_arch = "wasm32")]
322    fn send(
323        &self, name: &str, data: &[u8], sockets: Option<Vec<String>>,
324    ) -> impl Future<Output = Result<()>> + Send {
325        async move {
326            let client = omnia_wasi_websocket::types::Client::connect(name.to_string())
327                .await
328                .map_err(|e| anyhow!("connecting to websocket: {e}"))?;
329            let event = omnia_wasi_websocket::types::Event::new(data);
330            omnia_wasi_websocket::client::send(&client, event, sockets)
331                .await
332                .map_err(|e| anyhow!("sending websocket event: {e}"))
333        }
334    }
335}