indexdb_es/
event_repository.rs1use crate::{js_event::JsEvent, IndexDBAggregateError};
2use async_trait::async_trait;
3use cqrs_es::persist::{
4 PersistedEventRepository, PersistenceError, ReplayStream, SerializedEvent, SerializedSnapshot,
5};
6use cqrs_es::Aggregate;
7use futures::channel::oneshot::channel;
8use gloo_utils::format::JsValueSerdeExt;
9use idb::*;
10use wasm_bindgen::prelude::*;
11use wasm_bindgen_futures::spawn_local;
12use serde_json::Value;
14
15pub struct IndexDBEventRepository {
17 db_name: String,
18 store_name: String,
19}
20
21#[async_trait]
22impl PersistedEventRepository for IndexDBEventRepository {
23 async fn get_events<A: Aggregate>(
24 &self,
25 aggregate_id: &str,
26 ) -> Result<Vec<SerializedEvent>, PersistenceError> {
27 self.select_events::<A>(aggregate_id).await
28 }
29
30 async fn get_last_events<A: Aggregate>(
31 &self,
32 aggregate_id: &str,
33 last_sequence: usize,
34 ) -> Result<Vec<SerializedEvent>, PersistenceError> {
35 todo!()
36 }
37
38 async fn get_snapshot<A: Aggregate>(
39 &self,
40 aggregate_id: &str,
41 ) -> Result<Option<SerializedSnapshot>, PersistenceError> {
42 todo!()
43 }
44
45 async fn persist<A: Aggregate>(
46 &self,
47 events: &[SerializedEvent],
48 snapshot_update: Option<(String, Value, usize)>,
49 ) -> Result<(), PersistenceError> {
50 match snapshot_update {
51 None => {
52 self.insert_events::<A>(events).await?;
53 }
54 Some((aggregate_id, aggregate, current_snapshot)) => {
55 todo!()
63 }
64 };
65 Ok(())
66 }
67
68 async fn stream_events<A: Aggregate>(
69 &self,
70 aggregate_id: &str,
71 ) -> Result<ReplayStream, PersistenceError> {
72 todo!()
73 }
74
75 async fn stream_all_events<A: Aggregate>(&self) -> Result<ReplayStream, PersistenceError> {
77 todo!()
78 }
79}
80
81impl IndexDBEventRepository {
82 async fn select_events<A: Aggregate>(
83 &self,
84 aggregate_id: &str,
85 ) -> Result<Vec<SerializedEvent>, PersistenceError> {
86 let (sender, receiver) = channel::<Vec<SerializedEvent>>();
87
88 let db_name = self.db_name.clone();
89 let store_name = self.store_name.clone();
90 let aggregate_id = aggregate_id.to_string();
91
92 spawn_local(async move {
93 let db = connect(&db_name).await;
94
95 let transaction = db
97 .transaction(&[&store_name], TransactionMode::ReadWrite)
98 .unwrap();
99
100 let store = transaction.object_store(&store_name).unwrap();
102
103 let index = store.index("aggregate_id").unwrap();
104
105 let values = index
106 .get_all(Some(Query::Key(aggregate_id.into())), None)
107 .await
108 .unwrap();
109
110 let events: Vec<SerializedEvent> = values
111 .into_iter()
112 .map(|val| serde_wasm_bindgen::from_value::<JsEvent>(val).unwrap())
113 .map(|js_event| js_event.into())
114 .collect();
115
116 sender.send(events).unwrap();
117 });
118
119 match receiver.await {
120 Ok(result) => Ok(result),
121 Err(_) => Err(PersistenceError::OptimisticLockError),
122 }
123 }
124}
125
126impl IndexDBEventRepository {
127 pub fn new(db_name: Option<String>, store_name: Option<String>) -> Self {
128 Self {
129 db_name: db_name.unwrap_or("cqrs".to_string()),
130 store_name: store_name.unwrap_or("events".to_string()),
131 }
132 }
133
134 pub async fn insert_events<A: Aggregate>(
135 &self,
136 events: &[SerializedEvent],
137 ) -> Result<(), IndexDBAggregateError> {
138 let (sender, receiver) = channel::<Result<(), IndexDBAggregateError>>();
139
140 let db_name = self.db_name.clone();
141 let store_name = self.store_name.clone();
142 let events = events.to_vec();
143
144 spawn_local(async move {
145 let db = connect(&db_name).await;
146 let events: Vec<JsValue> = events
147 .into_iter()
148 .map(|e| JsEvent::from(e.clone()))
149 .map(|e| JsValue::from_serde(&e).unwrap())
150 .collect();
151
152 let transaction = db
154 .transaction(&[&store_name], TransactionMode::ReadWrite)
155 .unwrap();
156
157 let store = transaction.object_store(&store_name).unwrap();
159
160 let mut res: Result<(), IndexDBAggregateError> = Ok(());
161 for event in events {
163 web_sys::console::log_1(&event);
164 if store.add(&event, None).await.is_err() {
165 res = Err(IndexDBAggregateError::OptimisticLock);
166 }
167 }
168
169 if res.is_ok() {
171 transaction.commit().await.unwrap();
172 }
173
174 sender.send(res).unwrap();
175 });
176
177 match receiver.await {
178 Ok(result) => result,
179 Err(_) => Err(IndexDBAggregateError::OptimisticLock),
180 }
181 }
182}
183
184async fn connect(name: &str) -> Database {
185 console_error_panic_hook::set_once();
187
188 let factory = Factory::new().unwrap();
190
191 let mut open_request = factory.open(name, Some(1)).unwrap();
193
194 open_request.on_upgrade_needed(|event| {
196 let database = event.database().unwrap();
198
199 let mut store_params = ObjectStoreParams::new();
201 store_params.key_path(Some(KeyPath::new_array(vec![
202 "aggregate_type",
203 "aggregate_id",
204 "sequence",
205 ])));
206
207 let store = database
209 .create_object_store("events", store_params)
210 .unwrap();
211
212 store
214 .create_index("aggregate_id", KeyPath::new_single("aggregate_id"), None)
215 .unwrap();
216 });
217
218 open_request.await.unwrap()
219}