1use std::any::Any;
6use std::collections::HashMap;
7use std::error::Error;
8use std::future::Future;
9
10use anyhow::Result;
11#[cfg(target_arch = "wasm32")]
12use anyhow::{Context, anyhow};
13use bytes::Bytes;
14use http::{Request, Response};
15use http_body::Body;
16use omnia_wasi_sql::{DataType, Row};
17
18pub trait Config: Send + Sync {
21 #[cfg(not(target_arch = "wasm32"))]
23 fn get(&self, key: &str) -> impl Future<Output = Result<String>> + Send;
24
25 #[cfg(target_arch = "wasm32")]
27 fn get(&self, key: &str) -> impl Future<Output = Result<String>> + Send {
28 async move {
29 let config = omnia_wasi_config::store::get(key).context("getting configuration")?;
30 config.ok_or_else(|| anyhow!("configuration not found"))
31 }
32 }
33}
34
35pub trait HttpRequest: Send + Sync {
37 #[cfg(not(target_arch = "wasm32"))]
39 fn fetch<T>(&self, request: Request<T>) -> impl Future<Output = Result<Response<Bytes>>> + Send
40 where
41 T: Body + Any + Send,
42 T::Data: Into<Vec<u8>>,
43 T::Error: Into<Box<dyn Error + Send + Sync + 'static>>;
44
45 #[cfg(target_arch = "wasm32")]
47 fn fetch<T>(&self, request: Request<T>) -> impl Future<Output = Result<Response<Bytes>>> + Send
48 where
49 T: Body + Any + Send,
50 T::Data: Into<Vec<u8>>,
51 T::Error: Into<Box<dyn Error + Send + Sync + 'static>>,
52 {
53 async move { omnia_wasi_http::handle(request).await }
54 }
55}
56
57#[derive(Clone, Debug)]
59pub struct Message {
60 pub payload: Vec<u8>,
62 pub headers: HashMap<String, String>,
64}
65
66impl Message {
67 #[must_use]
69 pub fn new(payload: &[u8]) -> Self {
70 Self {
71 payload: payload.to_vec(),
72 headers: HashMap::new(),
73 }
74 }
75}
76
77pub trait Publish: Send + Sync {
79 #[cfg(not(target_arch = "wasm32"))]
81 fn send(&self, topic: &str, message: &Message) -> impl Future<Output = Result<()>> + Send;
82
83 #[cfg(target_arch = "wasm32")]
85 fn send(&self, topic: &str, message: &Message) -> impl Future<Output = Result<()>> + Send {
86 use omnia_wasi_messaging::producer;
87 use omnia_wasi_messaging::types::{self as wasi, Client};
88
89 async move {
90 let client =
91 Client::connect("host".to_string()).await.context("connecting to broker")?;
92 producer::send(&client, topic.to_string(), wasi::Message::new(&message.payload))
93 .await
94 .with_context(|| format!("sending message to {topic}"))
95 }
96 }
97}
98
99pub trait StateStore: Send + Sync {
101 #[cfg(not(target_arch = "wasm32"))]
103 fn get(&self, key: &str) -> impl Future<Output = Result<Option<Vec<u8>>>> + Send;
104
105 #[cfg(not(target_arch = "wasm32"))]
107 fn set(
108 &self, key: &str, value: &[u8], ttl_secs: Option<u64>,
109 ) -> impl Future<Output = Result<Option<Vec<u8>>>> + Send;
110
111 #[cfg(not(target_arch = "wasm32"))]
113 fn delete(&self, key: &str) -> impl Future<Output = Result<()>> + Send;
114
115 #[cfg(target_arch = "wasm32")]
117 fn get(&self, key: &str) -> impl Future<Output = Result<Option<Vec<u8>>>> + Send {
118 async move {
119 let bucket =
120 omnia_wasi_keyvalue::cache::open("cache").await.context("opening cache")?;
121 bucket.get(key).await.context("reading state from cache")
122 }
123 }
124
125 #[cfg(target_arch = "wasm32")]
127 fn set(
128 &self, key: &str, value: &[u8], ttl_secs: Option<u64>,
129 ) -> impl Future<Output = Result<Option<Vec<u8>>>> + Send {
130 async move {
131 let bucket =
132 omnia_wasi_keyvalue::cache::open("cache").await.context("opening cache")?;
133 bucket.set(key, value, ttl_secs).await.context("reading state from cache")
134 }
135 }
136
137 #[cfg(target_arch = "wasm32")]
139 fn delete(&self, key: &str) -> impl Future<Output = Result<()>> + Send {
140 async move {
141 let bucket =
142 omnia_wasi_keyvalue::cache::open("cache").await.context("opening cache")?;
143 bucket.delete(key).await.context("deleting entry from cache")
144 }
145 }
146}
147
148pub trait Identity: Send + Sync {
150 #[cfg(not(target_arch = "wasm32"))]
152 fn access_token(&self, identity: String) -> impl Future<Output = Result<String>> + Send;
153
154 #[cfg(target_arch = "wasm32")]
156 fn access_token(&self, identity: String) -> impl Future<Output = Result<String>> + Send {
157 use omnia_wasi_identity::credentials::get_identity;
158
159 async move {
160 let identity = wit_bindgen::block_on(get_identity(identity))?;
161 let access_token =
162 wit_bindgen::block_on(async move { identity.get_token(vec![]).await })?;
163 Ok(access_token.token)
164 }
165 }
166}
167
168pub trait TableStore: Send + Sync {
173 #[cfg(not(target_arch = "wasm32"))]
175 fn query(
176 &self, cnn_name: String, query: String, params: Vec<DataType>,
177 ) -> impl Future<Output = Result<Vec<Row>>> + Send;
178
179 #[cfg(not(target_arch = "wasm32"))]
181 fn exec(
182 &self, cnn_name: String, query: String, params: Vec<DataType>,
183 ) -> impl Future<Output = Result<u32>> + Send;
184
185 #[cfg(target_arch = "wasm32")]
191 fn query(
192 &self, cnn_name: String, query: String, params: Vec<DataType>,
193 ) -> impl Future<Output = Result<Vec<Row>>> + Send {
194 use omnia_wasi_sql::types::{Connection, Statement};
195 async move {
196 let cnn = Connection::open(cnn_name)
197 .await
198 .map_err(|e| anyhow!("failed to open connection: {}", e.trace()))?;
199
200 let stmt = Statement::prepare(query, params)
201 .await
202 .map_err(|e| anyhow!("failed to prepare statement: {}", e.trace()))?;
203
204 let res = omnia_wasi_sql::readwrite::query(&cnn, &stmt)
205 .await
206 .map_err(|e| anyhow!("query failed: {}", e.trace()))?;
207
208 Ok(res)
209 }
210 }
211
212 #[cfg(target_arch = "wasm32")]
218 fn exec(
219 &self, cnn_name: String, query: String, params: Vec<DataType>,
220 ) -> impl Future<Output = Result<u32>> + Send {
221 use omnia_wasi_sql::types::{Connection, Statement};
222 async move {
223 let cnn = Connection::open(cnn_name)
224 .await
225 .map_err(|e| anyhow!("failed to open connection: {}", e.trace()))?;
226
227 let stmt = Statement::prepare(query, params)
228 .await
229 .map_err(|e| anyhow!("failed to prepare statement: {}", e.trace()))?;
230
231 let res = omnia_wasi_sql::readwrite::exec(&cnn, &stmt)
232 .await
233 .map_err(|e| anyhow!("exec failed: {}", e.trace()))?;
234
235 Ok(res)
236 }
237 }
238}
239
240pub trait DocumentStore: Send + Sync {
244 #[cfg(not(target_arch = "wasm32"))]
246 fn get(
247 &self, store: &str, id: &str,
248 ) -> impl Future<Output = Result<Option<crate::document_store::Document>>> + Send;
249
250 #[cfg(not(target_arch = "wasm32"))]
252 fn insert(
253 &self, store: &str, doc: &crate::document_store::Document,
254 ) -> impl Future<Output = Result<()>> + Send;
255
256 #[cfg(not(target_arch = "wasm32"))]
258 fn put(
259 &self, store: &str, doc: &crate::document_store::Document,
260 ) -> impl Future<Output = Result<()>> + Send;
261
262 #[cfg(not(target_arch = "wasm32"))]
264 fn delete(&self, store: &str, id: &str) -> impl Future<Output = Result<bool>> + Send;
265
266 #[cfg(not(target_arch = "wasm32"))]
268 fn query(
269 &self, store: &str, options: crate::document_store::QueryOptions,
270 ) -> impl Future<Output = Result<crate::document_store::QueryResult>> + Send;
271
272 #[cfg(target_arch = "wasm32")]
274 fn get(
275 &self, store: &str, id: &str,
276 ) -> impl Future<Output = Result<Option<crate::document_store::Document>>> + Send {
277 async move { omnia_wasi_jsondb::store::get(store, id).await }
278 }
279
280 #[cfg(target_arch = "wasm32")]
282 fn insert(
283 &self, store: &str, doc: &crate::document_store::Document,
284 ) -> impl Future<Output = Result<()>> + Send {
285 async move { omnia_wasi_jsondb::store::insert(store, doc).await }
286 }
287
288 #[cfg(target_arch = "wasm32")]
290 fn put(
291 &self, store: &str, doc: &crate::document_store::Document,
292 ) -> impl Future<Output = Result<()>> + Send {
293 async move { omnia_wasi_jsondb::store::put(store, doc).await }
294 }
295
296 #[cfg(target_arch = "wasm32")]
298 fn delete(&self, store: &str, id: &str) -> impl Future<Output = Result<bool>> + Send {
299 async move { omnia_wasi_jsondb::store::delete(store, id).await }
300 }
301
302 #[cfg(target_arch = "wasm32")]
304 fn query(
305 &self, store: &str, options: crate::document_store::QueryOptions,
306 ) -> impl Future<Output = Result<crate::document_store::QueryResult>> + Send {
307 async move { omnia_wasi_jsondb::store::query(store, options).await }
308 }
309}
310
311pub trait Broadcast: Send + Sync {
314 #[cfg(not(target_arch = "wasm32"))]
316 fn send(
317 &self, name: &str, data: &[u8], sockets: Option<Vec<String>>,
318 ) -> impl Future<Output = Result<()>> + Send;
319
320 #[cfg(target_arch = "wasm32")]
322 fn send(
323 &self, name: &str, data: &[u8], sockets: Option<Vec<String>>,
324 ) -> impl Future<Output = Result<()>> + Send {
325 async move {
326 let client = omnia_wasi_websocket::types::Client::connect(name.to_string())
327 .await
328 .map_err(|e| anyhow!("connecting to websocket: {e}"))?;
329 let event = omnia_wasi_websocket::types::Event::new(data);
330 omnia_wasi_websocket::client::send(&client, event, sockets)
331 .await
332 .map_err(|e| anyhow!("sending websocket event: {e}"))
333 }
334 }
335}
336
337#[derive(Clone, Debug)]
341pub struct ContainerMetadata {
342 pub name: String,
344 pub created_at: u64,
346}
347
348#[derive(Clone, Debug)]
352pub struct ObjectMetadata {
353 pub name: String,
355 pub container: String,
357 pub created_at: u64,
359 pub size: u64,
361}
362
363pub trait BlobStore: Send + Sync {
368 #[cfg(not(target_arch = "wasm32"))]
374 fn get(
375 &self, container: &str, name: &str,
376 ) -> impl Future<Output = Result<Option<Vec<u8>>>> + Send;
377
378 #[cfg(not(target_arch = "wasm32"))]
380 fn put(
381 &self, container: &str, name: &str, data: &[u8],
382 ) -> impl Future<Output = Result<()>> + Send;
383
384 #[cfg(not(target_arch = "wasm32"))]
386 fn delete(&self, container: &str, name: &str) -> impl Future<Output = Result<()>> + Send;
387
388 #[cfg(not(target_arch = "wasm32"))]
390 fn has(&self, container: &str, name: &str) -> impl Future<Output = Result<bool>> + Send;
391
392 #[cfg(not(target_arch = "wasm32"))]
394 fn list(&self, container: &str) -> impl Future<Output = Result<Vec<String>>> + Send;
395
396 #[cfg(not(target_arch = "wasm32"))]
400 fn get_range(
401 &self, container: &str, name: &str, start: u64, end: u64,
402 ) -> impl Future<Output = Result<Vec<u8>>> + Send;
403
404 #[cfg(not(target_arch = "wasm32"))]
406 fn object_info(
407 &self, container: &str, name: &str,
408 ) -> impl Future<Output = Result<ObjectMetadata>> + Send;
409
410 #[cfg(not(target_arch = "wasm32"))]
412 fn delete_objects(
413 &self, container: &str, names: &[String],
414 ) -> impl Future<Output = Result<()>> + Send;
415
416 #[cfg(not(target_arch = "wasm32"))]
418 fn clear(&self, container: &str) -> impl Future<Output = Result<()>> + Send;
419
420 #[cfg(not(target_arch = "wasm32"))]
426 fn create_container(&self, name: &str) -> impl Future<Output = Result<()>> + Send;
427
428 #[cfg(not(target_arch = "wasm32"))]
430 fn delete_container(&self, name: &str) -> impl Future<Output = Result<()>> + Send;
431
432 #[cfg(not(target_arch = "wasm32"))]
434 fn container_exists(&self, name: &str) -> impl Future<Output = Result<bool>> + Send;
435
436 #[cfg(not(target_arch = "wasm32"))]
438 fn container_info(
439 &self, container: &str,
440 ) -> impl Future<Output = Result<ContainerMetadata>> + Send;
441
442 #[cfg(not(target_arch = "wasm32"))]
451 fn copy_object(
452 &self, src_container: &str, src_name: &str, dest_container: &str, dest_name: &str,
453 ) -> impl Future<Output = Result<()>> + Send;
454
455 #[cfg(not(target_arch = "wasm32"))]
460 fn move_object(
461 &self, src_container: &str, src_name: &str, dest_container: &str, dest_name: &str,
462 ) -> impl Future<Output = Result<()>> + Send;
463
464 #[cfg(target_arch = "wasm32")]
470 fn get(
471 &self, container: &str, name: &str,
472 ) -> impl Future<Output = Result<Option<Vec<u8>>>> + Send {
473 use omnia_wasi_blobstore::blobstore;
474 use omnia_wasi_blobstore::types::IncomingValue;
475
476 async move {
477 let ctr = blobstore::get_container(container.to_string())
478 .await
479 .map_err(|e| anyhow!("opening container: {e}"))?;
480 if !ctr
481 .has_object(name.to_string())
482 .await
483 .map_err(|e| anyhow!("checking object existence: {e}"))?
484 {
485 return Ok(None);
486 }
487 let incoming = ctr
488 .get_data(name.to_string(), 0, u64::MAX)
489 .await
490 .map_err(|e| anyhow!("reading object: {e}"))?;
491 let data = IncomingValue::incoming_value_consume_sync(incoming)
492 .map_err(|e| anyhow!("consuming incoming value: {e}"))?;
493 Ok(Some(data))
494 }
495 }
496
497 #[cfg(target_arch = "wasm32")]
499 fn put(
500 &self, container: &str, name: &str, data: &[u8],
501 ) -> impl Future<Output = Result<()>> + Send {
502 use omnia_wasi_blobstore::blobstore;
503 use omnia_wasi_blobstore::types::OutgoingValue;
504
505 async move {
506 let ctr = blobstore::get_container(container.to_string())
507 .await
508 .map_err(|e| anyhow!("opening container: {e}"))?;
509 let outgoing = OutgoingValue::new_outgoing_value();
510 {
511 let body = outgoing
512 .outgoing_value_write_body()
513 .await
514 .map_err(|e| anyhow!("getting write body: {e:?}"))?;
515 body.blocking_write_and_flush(data).map_err(|e| anyhow!("writing data: {e}"))?;
516 };
517 ctr.write_data(name.to_string(), &outgoing)
518 .await
519 .map_err(|e| anyhow!("writing object: {e}"))?;
520 OutgoingValue::finish(outgoing).map_err(|e| anyhow!("finishing write: {e}"))?;
521 Ok(())
522 }
523 }
524
525 #[cfg(target_arch = "wasm32")]
527 fn delete(&self, container: &str, name: &str) -> impl Future<Output = Result<()>> + Send {
528 use omnia_wasi_blobstore::blobstore;
529
530 async move {
531 let ctr = blobstore::get_container(container.to_string())
532 .await
533 .map_err(|e| anyhow!("opening container: {e}"))?;
534 ctr.delete_object(name.to_string()).await.map_err(|e| anyhow!("deleting object: {e}"))
535 }
536 }
537
538 #[cfg(target_arch = "wasm32")]
540 fn has(&self, container: &str, name: &str) -> impl Future<Output = Result<bool>> + Send {
541 use omnia_wasi_blobstore::blobstore;
542
543 async move {
544 let ctr = blobstore::get_container(container.to_string())
545 .await
546 .map_err(|e| anyhow!("opening container: {e}"))?;
547 ctr.has_object(name.to_string())
548 .await
549 .map_err(|e| anyhow!("checking object existence: {e}"))
550 }
551 }
552
553 #[cfg(target_arch = "wasm32")]
555 fn list(&self, container: &str) -> impl Future<Output = Result<Vec<String>>> + Send {
556 use omnia_wasi_blobstore::blobstore;
557
558 async move {
559 let ctr = blobstore::get_container(container.to_string())
560 .await
561 .map_err(|e| anyhow!("opening container: {e}"))?;
562 let stream = ctr.list_objects().await.map_err(|e| anyhow!("listing objects: {e}"))?;
563 let mut names = Vec::new();
564 loop {
565 let (batch, done) = stream
566 .read_stream_object_names(100)
567 .await
568 .map_err(|e| anyhow!("reading object names: {e}"))?;
569 names.extend(batch);
570 if done {
571 break;
572 }
573 }
574 Ok(names)
575 }
576 }
577
578 #[cfg(target_arch = "wasm32")]
582 fn get_range(
583 &self, container: &str, name: &str, start: u64, end: u64,
584 ) -> impl Future<Output = Result<Vec<u8>>> + Send {
585 use omnia_wasi_blobstore::blobstore;
586 use omnia_wasi_blobstore::types::IncomingValue;
587
588 async move {
589 let ctr = blobstore::get_container(container.to_string())
590 .await
591 .map_err(|e| anyhow!("opening container: {e}"))?;
592 let incoming = ctr
593 .get_data(name.to_string(), start, end)
594 .await
595 .map_err(|e| anyhow!("reading object range: {e}"))?;
596 let data = IncomingValue::incoming_value_consume_sync(incoming)
597 .map_err(|e| anyhow!("consuming incoming value: {e}"))?;
598 Ok(data)
599 }
600 }
601
602 #[cfg(target_arch = "wasm32")]
604 fn object_info(
605 &self, container: &str, name: &str,
606 ) -> impl Future<Output = Result<ObjectMetadata>> + Send {
607 use omnia_wasi_blobstore::blobstore;
608
609 async move {
610 let ctr = blobstore::get_container(container.to_string())
611 .await
612 .map_err(|e| anyhow!("opening container: {e}"))?;
613 let info = ctr
614 .object_info(name.to_string())
615 .await
616 .map_err(|e| anyhow!("getting object info: {e}"))?;
617 Ok(ObjectMetadata {
618 name: info.name,
619 container: info.container,
620 created_at: info.created_at,
621 size: info.size,
622 })
623 }
624 }
625
626 #[cfg(target_arch = "wasm32")]
628 fn delete_objects(
629 &self, container: &str, names: &[String],
630 ) -> impl Future<Output = Result<()>> + Send {
631 use omnia_wasi_blobstore::blobstore;
632
633 let names = names.to_vec();
634 async move {
635 let ctr = blobstore::get_container(container.to_string())
636 .await
637 .map_err(|e| anyhow!("opening container: {e}"))?;
638 ctr.delete_objects(names).await.map_err(|e| anyhow!("deleting objects: {e}"))
639 }
640 }
641
642 #[cfg(target_arch = "wasm32")]
644 fn clear(&self, container: &str) -> impl Future<Output = Result<()>> + Send {
645 use omnia_wasi_blobstore::blobstore;
646
647 async move {
648 let ctr = blobstore::get_container(container.to_string())
649 .await
650 .map_err(|e| anyhow!("opening container: {e}"))?;
651 ctr.clear().await.map_err(|e| anyhow!("clearing container: {e}"))
652 }
653 }
654
655 #[cfg(target_arch = "wasm32")]
657 fn create_container(&self, name: &str) -> impl Future<Output = Result<()>> + Send {
658 use omnia_wasi_blobstore::blobstore;
659
660 async move {
661 let _container = blobstore::create_container(name.to_string())
662 .await
663 .map_err(|e| anyhow!("creating container: {e}"))?;
664 Ok(())
665 }
666 }
667
668 #[cfg(target_arch = "wasm32")]
670 fn delete_container(&self, name: &str) -> impl Future<Output = Result<()>> + Send {
671 use omnia_wasi_blobstore::blobstore;
672
673 async move {
674 blobstore::delete_container(name.to_string())
675 .await
676 .map_err(|e| anyhow!("deleting container: {e}"))
677 }
678 }
679
680 #[cfg(target_arch = "wasm32")]
682 fn container_exists(&self, name: &str) -> impl Future<Output = Result<bool>> + Send {
683 use omnia_wasi_blobstore::blobstore;
684
685 async move {
686 blobstore::container_exists(name.to_string())
687 .await
688 .map_err(|e| anyhow!("checking container existence: {e}"))
689 }
690 }
691
692 #[cfg(target_arch = "wasm32")]
694 fn container_info(
695 &self, container: &str,
696 ) -> impl Future<Output = Result<ContainerMetadata>> + Send {
697 use omnia_wasi_blobstore::blobstore;
698
699 async move {
700 let ctr = blobstore::get_container(container.to_string())
701 .await
702 .map_err(|e| anyhow!("opening container: {e}"))?;
703 let info = ctr.info().map_err(|e| anyhow!("getting container info: {e}"))?;
704 Ok(ContainerMetadata {
705 name: info.name,
706 created_at: info.created_at,
707 })
708 }
709 }
710
711 #[cfg(target_arch = "wasm32")]
716 fn copy_object(
717 &self, src_container: &str, src_name: &str, dest_container: &str, dest_name: &str,
718 ) -> impl Future<Output = Result<()>> + Send {
719 use omnia_wasi_blobstore::blobstore;
720 use omnia_wasi_blobstore::types::ObjectId;
721
722 async move {
723 let src = ObjectId {
724 container: src_container.to_string(),
725 object: src_name.to_string(),
726 };
727 let dest = ObjectId {
728 container: dest_container.to_string(),
729 object: dest_name.to_string(),
730 };
731 blobstore::copy_object(src, dest).await.map_err(|e| anyhow!("copying object: {e}"))
732 }
733 }
734
735 #[cfg(target_arch = "wasm32")]
740 fn move_object(
741 &self, src_container: &str, src_name: &str, dest_container: &str, dest_name: &str,
742 ) -> impl Future<Output = Result<()>> + Send {
743 use omnia_wasi_blobstore::blobstore;
744 use omnia_wasi_blobstore::types::ObjectId;
745
746 async move {
747 let src = ObjectId {
748 container: src_container.to_string(),
749 object: src_name.to_string(),
750 };
751 let dest = ObjectId {
752 container: dest_container.to_string(),
753 object: dest_name.to_string(),
754 };
755 blobstore::move_object(src, dest).await.map_err(|e| anyhow!("moving object: {e}"))
756 }
757 }
758}