use crate::{DataResult, DataStore, Equivalent};
use serde_json::{json, Value};
use std::any::Any;
use std::cell::RefCell;
use std::collections::VecDeque;
use std::io::{Error, Result};
use std::rc::Rc;
use wasm_bindgen::prelude::*;
use wasm_bindgen_futures::spawn_local;
use web_sys::{IdbDatabase, IdbRequest};
const DB_VERSION: u32 = 1;
const STORE_NAME: &str = "events";
#[derive(Clone)]
pub struct WebConfig {
pub write_key: String,
pub database_name: String,
pub max_items: usize,
pub max_fetch_size: usize,
}
#[derive(Clone, Debug)]
struct StoredEvent {
idb_key: Option<u32>,
value: Value,
}
impl Equivalent for StoredEvent {
fn equals(&self, other: &dyn Equivalent) -> bool {
if let Some(other_event) = other.as_any().downcast_ref::<StoredEvent>() {
match (&self.idb_key, &other_event.idb_key) {
(Some(a), Some(b)) => a == b,
_ => self.value == other_event.value,
}
} else {
false
}
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum PersistenceState {
Persisted,
MemoryOnly,
}
pub struct WebStore {
config: WebConfig,
items: VecDeque<StoredEvent>,
db: Option<Rc<IdbDatabase>>,
temp_key_counter: u32,
persistence_state: PersistenceState,
}
impl WebStore {
pub async fn new(config: WebConfig) -> Self {
if config.max_fetch_size < 100 {
panic!("max_fetch_size < 100 bytes? What are you even trying to fetch, empty arrays?");
}
if config.max_items == 0 {
panic!("max_items = 0? So... you want a store that stores nothing? That's what /dev/null is for.");
}
let mut store = Self {
config,
items: VecDeque::new(),
db: None,
temp_key_counter: 0,
persistence_state: PersistenceState::MemoryOnly,
};
match store.open_database().await {
Ok(db) => {
store.db = Some(Rc::new(db));
store.persistence_state = PersistenceState::Persisted;
if let Err(e) = store.hydrate().await {
web_sys::console::warn_1(
&format!("Failed to hydrate from IndexedDB, starting fresh: {:?}", e)
.into(),
);
}
}
Err(e) => {
web_sys::console::warn_1(
&format!(
"IndexedDB unavailable ({}), falling back to memory-only storage. \
Events will not persist across page refreshes. \
Consider increasing flush frequency.",
e
)
.into(),
);
}
}
store
}
pub fn persistence_state(&self) -> PersistenceState {
self.persistence_state
}
pub fn is_persisted(&self) -> bool {
self.persistence_state == PersistenceState::Persisted
}
async fn open_database(&self) -> Result<IdbDatabase> {
let window = web_sys::window().ok_or_else(|| Error::other("No window object"))?;
let idb_factory = window
.indexed_db()
.map_err(|e| Error::other(format!("IndexedDB error: {:?}", e)))?
.ok_or_else(|| Error::other("IndexedDB not available"))?;
let open_request = idb_factory
.open_with_f64(&self.config.database_name, DB_VERSION as f64)
.map_err(|e| Error::other(format!("Failed to open DB: {:?}", e)))?;
let on_upgrade = Closure::once(move |event: web_sys::IdbVersionChangeEvent| {
let target = event.target().unwrap();
let request: IdbRequest = target.unchecked_into();
let db: IdbDatabase = request.result().unwrap().unchecked_into();
if !db.object_store_names().contains(STORE_NAME) {
let params = web_sys::IdbObjectStoreParameters::new();
params.set_auto_increment(true);
params.set_key_path(&JsValue::from_str("_idb_key"));
db.create_object_store_with_optional_parameters(STORE_NAME, ¶ms)
.expect("Failed to create object store");
}
});
open_request.set_onupgradeneeded(Some(on_upgrade.as_ref().unchecked_ref()));
on_upgrade.forget();
let db = Self::await_request::<IdbDatabase>(&open_request).await?;
Ok(db)
}
async fn hydrate(&mut self) -> Result<()> {
let db = match &self.db {
Some(db) => db.clone(),
None => return Ok(()), };
let transaction = db
.transaction_with_str_and_mode(STORE_NAME, web_sys::IdbTransactionMode::Readonly)
.map_err(|e| Error::other(format!("Transaction error: {:?}", e)))?;
let store = transaction
.object_store(STORE_NAME)
.map_err(|e| Error::other(format!("Object store error: {:?}", e)))?;
let request = store
.get_all()
.map_err(|e| Error::other(format!("GetAll error: {:?}", e)))?;
let result = Self::await_request::<JsValue>(&request).await?;
if let Ok(array) = result.dyn_into::<js_sys::Array>() {
for item in array.iter() {
if let Ok(obj) = js_sys::JSON::stringify(&item) {
if let Ok(s) = obj.as_string().ok_or(()) {
if let Ok(mut value) = serde_json::from_str::<Value>(&s) {
let idb_key = value
.get("_idb_key")
.and_then(|k| k.as_u64())
.map(|k| k as u32);
if let Some(obj) = value.as_object_mut() {
obj.remove("_idb_key");
}
self.items.push_back(StoredEvent { idb_key, value });
}
}
}
}
}
if let Some(max_key) = self.items.iter().filter_map(|e| e.idb_key).max() {
self.temp_key_counter = max_key + 1;
}
Ok(())
}
fn persist_event(&self, event: StoredEvent) {
let Some(db) = &self.db else { return };
let db = db.clone();
let write_key = self.config.write_key.clone();
spawn_local(async move {
if let Err(e) = Self::write_to_idb(&db, &write_key, &event).await {
web_sys::console::warn_1(&format!("IndexedDB write failed: {:?}", e).into());
}
});
}
async fn write_to_idb(db: &IdbDatabase, _write_key: &str, event: &StoredEvent) -> Result<()> {
let transaction = db
.transaction_with_str_and_mode(STORE_NAME, web_sys::IdbTransactionMode::Readwrite)
.map_err(|e| Error::other(format!("Transaction error: {:?}", e)))?;
let store = transaction
.object_store(STORE_NAME)
.map_err(|e| Error::other(format!("Object store error: {:?}", e)))?;
let json_str = serde_json::to_string(&event.value)
.map_err(|e| Error::other(format!("JSON error: {:?}", e)))?;
let js_value = js_sys::JSON::parse(&json_str)
.map_err(|e| Error::other(format!("JS JSON parse error: {:?}", e)))?;
let request = store
.add(&js_value)
.map_err(|e| Error::other(format!("Add error: {:?}", e)))?;
Self::await_request::<JsValue>(&request).await?;
Ok(())
}
fn remove_from_idb(&self, idb_key: u32) {
let Some(db) = &self.db else { return };
let db = db.clone();
spawn_local(async move {
if let Err(e) = Self::delete_from_idb(&db, idb_key).await {
web_sys::console::warn_1(&format!("IndexedDB delete failed: {:?}", e).into());
}
});
}
async fn delete_from_idb(db: &IdbDatabase, idb_key: u32) -> Result<()> {
let transaction = db
.transaction_with_str_and_mode(STORE_NAME, web_sys::IdbTransactionMode::Readwrite)
.map_err(|e| Error::other(format!("Transaction error: {:?}", e)))?;
let store = transaction
.object_store(STORE_NAME)
.map_err(|e| Error::other(format!("Object store error: {:?}", e)))?;
let request = store
.delete(&JsValue::from(idb_key))
.map_err(|e| Error::other(format!("Delete error: {:?}", e)))?;
Self::await_request::<JsValue>(&request).await?;
Ok(())
}
async fn await_request<T: JsCast>(request: &IdbRequest) -> Result<T> {
let (sender, receiver) = futures_channel::oneshot::channel();
let sender = Rc::new(RefCell::new(Some(sender)));
let success_sender = sender.clone();
let onsuccess = Closure::once(move |_event: web_sys::Event| {
if let Some(sender) = success_sender.borrow_mut().take() {
let _ = sender.send(Ok(()));
}
});
let error_sender = sender.clone();
let onerror = Closure::once(move |_event: web_sys::Event| {
if let Some(sender) = error_sender.borrow_mut().take() {
let _ = sender.send(Err(Error::other("IndexedDB request failed")));
}
});
request.set_onsuccess(Some(onsuccess.as_ref().unchecked_ref()));
request.set_onerror(Some(onerror.as_ref().unchecked_ref()));
onsuccess.forget();
onerror.forget();
receiver
.await
.map_err(|_| Error::other("Channel closed"))??;
request
.result()
.map_err(|e| Error::other(format!("Result error: {:?}", e)))?
.dyn_into::<T>()
.map_err(|_| Error::other("Type cast failed"))
}
fn create_batch(&self, items: &[StoredEvent]) -> Value {
let values: Vec<&Value> = items.iter().map(|e| &e.value).collect();
json!({
"batch": values,
"sentAt": Self::now_rfc3339(),
"writeKey": self.config.write_key
})
}
fn now_rfc3339() -> String {
let date = js_sys::Date::new_0();
date.to_iso_string().into()
}
fn get_item_size(item: &StoredEvent) -> usize {
item.value.to_string().len()
}
}
impl DataStore for WebStore {
type Output = Value;
fn has_data(&self) -> bool {
!self.items.is_empty()
}
fn reset(&mut self) {
let items: Vec<StoredEvent> = self.items.drain(..).collect();
for item in items {
if let Some(key) = item.idb_key {
self.remove_from_idb(key);
}
}
}
fn append(&mut self, data: Value) -> Result<()> {
let event = StoredEvent {
idb_key: Some(self.temp_key_counter),
value: data,
};
self.temp_key_counter += 1;
self.items.push_back(event.clone());
while self.items.len() > self.config.max_items {
if let Some(removed) = self.items.pop_front() {
if let Some(key) = removed.idb_key {
self.remove_from_idb(key);
}
}
}
self.persist_event(event);
Ok(())
}
fn fetch(
&mut self,
count: Option<usize>,
max_bytes: Option<usize>,
) -> Result<Option<DataResult<Self::Output>>> {
let max_bytes = max_bytes.unwrap_or(self.config.max_fetch_size);
let mut accumulated_size = 0;
let mut num_items = 0;
for item in self.items.iter() {
let item_size = Self::get_item_size(item);
if accumulated_size + item_size > max_bytes {
break;
}
if let Some(count) = count {
if num_items >= count {
break;
}
}
accumulated_size += item_size;
num_items += 1;
}
if num_items == 0 {
return Ok(None);
}
let items: Vec<StoredEvent> = self.items.iter().take(num_items).cloned().collect();
let removable: Vec<Box<dyn Equivalent>> = items
.iter()
.map(|item| Box::new(item.clone()) as Box<dyn Equivalent>)
.collect();
let batch = self.create_batch(&items);
Ok(Some(DataResult {
data: Some(batch),
removable: Some(removable),
}))
}
fn remove(&mut self, data: &[Box<dyn Equivalent>]) -> Result<()> {
let keys_to_remove: Vec<u32> = self
.items
.iter()
.filter(|item| data.iter().any(|removable| removable.equals(*item)))
.filter_map(|item| item.idb_key)
.collect();
self.items
.retain(|item| !data.iter().any(|removable| removable.equals(item)));
for key in keys_to_remove {
self.remove_from_idb(key);
}
Ok(())
}
}
#[cfg(all(test, target_arch = "wasm32"))]
mod tests {
use super::*;
use wasm_bindgen_test::*;
wasm_bindgen_test_configure!(run_in_browser);
fn test_config(db_name: &str) -> WebConfig {
WebConfig {
write_key: "test-key".to_string(),
database_name: db_name.to_string(),
max_items: 1000,
max_fetch_size: 1024,
}
}
#[wasm_bindgen_test]
async fn test_basic_operations() {
let mut store = WebStore::new(test_config("test-basic-ops")).await;
assert!(!store.has_data());
let event = json!({"event": "test", "value": 123});
store.append(event.clone()).unwrap();
assert!(store.has_data());
if let Some(result) = store.fetch(None, None).unwrap() {
let batch: Value = result.data.unwrap();
let items = batch["batch"].as_array().unwrap();
assert_eq!(items.len(), 1);
assert_eq!(items[0]["value"], 123);
assert!(store.has_data());
if let Some(removable) = result.removable {
store.remove(&removable).unwrap();
assert!(!store.has_data());
} else {
panic!("Expected removable items but got none");
}
} else {
panic!("Expected data but got none");
}
}
#[wasm_bindgen_test]
async fn test_fifo_behavior() {
let config = WebConfig {
write_key: "test-key".to_string(),
database_name: "test-fifo".to_string(),
max_items: 3, max_fetch_size: 1024,
};
let mut store = WebStore::new(config).await;
for i in 0..5 {
store.append(json!({"index": i})).unwrap();
}
assert!(store.has_data());
if let Some(result) = store.fetch(None, None).unwrap() {
let batch: Value = result.data.unwrap();
let items = batch["batch"].as_array().unwrap();
assert_eq!(items.len(), 3);
assert_eq!(items[0]["index"], 2);
assert_eq!(items[1]["index"], 3);
assert_eq!(items[2]["index"], 4);
}
}
#[wasm_bindgen_test]
async fn test_fetch_count_limit() {
let mut store = WebStore::new(test_config("test-fetch-count")).await;
for i in 0..10 {
store.append(json!({"index": i})).unwrap();
}
if let Some(result) = store.fetch(Some(3), None).unwrap() {
let batch: Value = result.data.unwrap();
let items = batch["batch"].as_array().unwrap();
assert_eq!(items.len(), 3, "Count limit not respected");
}
}
#[wasm_bindgen_test]
async fn test_fetch_byte_limit() {
let config = WebConfig {
write_key: "test-key".to_string(),
database_name: "test-fetch-bytes".to_string(),
max_items: 100,
max_fetch_size: 1000,
};
let mut store = WebStore::new(config).await;
for i in 0..10 {
let padding = "x".repeat(50); store
.append(json!({
"index": i,
"padding": padding
}))
.unwrap();
}
if let Some(result) = store.fetch(None, Some(200)).unwrap() {
let batch: Value = result.data.unwrap();
let items = batch["batch"].as_array().unwrap();
assert!(items.len() <= 3, "Too many items for byte limit");
}
}
#[wasm_bindgen_test]
async fn test_reset() {
let mut store = WebStore::new(test_config("test-reset")).await;
for i in 0..5 {
store.append(json!({"index": i})).unwrap();
}
assert!(store.has_data());
store.reset();
assert!(!store.has_data());
}
#[wasm_bindgen_test]
async fn test_json_types() {
let mut store = WebStore::new(test_config("test-json-types")).await;
store.append(json!(null)).unwrap();
store.append(json!(true)).unwrap();
store.append(json!(42)).unwrap();
store.append(json!(42.5)).unwrap();
store.append(json!("string")).unwrap();
store.append(json!([1, 2, 3])).unwrap();
store.append(json!({"key": "value"})).unwrap();
if let Some(result) = store.fetch(None, None).unwrap() {
let batch: Value = result.data.unwrap();
let items = batch["batch"].as_array().unwrap();
assert_eq!(
items.len(),
7,
"All JSON types should be stored and retrieved"
);
}
}
#[wasm_bindgen_test]
async fn test_batch_metadata() {
let mut store = WebStore::new(test_config("test-batch-metadata")).await;
store.append(json!({"event": "test"})).unwrap();
if let Some(result) = store.fetch(None, None).unwrap() {
let batch: Value = result.data.unwrap();
assert!(batch.get("batch").is_some(), "Missing 'batch' field");
assert!(batch.get("sentAt").is_some(), "Missing 'sentAt' field");
assert_eq!(
batch.get("writeKey").and_then(Value::as_str),
Some("test-key"),
"Wrong writeKey"
);
}
}
#[wasm_bindgen_test]
async fn test_persistence_state() {
let store = WebStore::new(test_config("test-persistence-state")).await;
let state = store.persistence_state();
assert!(
state == PersistenceState::Persisted || state == PersistenceState::MemoryOnly,
"Invalid persistence state"
);
assert_eq!(store.is_persisted(), state == PersistenceState::Persisted);
}
#[wasm_bindgen_test]
async fn test_hydration_across_instances() {
let db_name = "test-hydration";
{
let mut store = WebStore::new(WebConfig {
write_key: "test-key".to_string(),
database_name: db_name.to_string(),
max_items: 1000,
max_fetch_size: 1024,
})
.await;
if !store.is_persisted() {
web_sys::console::log_1(&"Skipping hydration test - no persistence".into());
return;
}
store
.append(json!({"event": "persisted_event", "value": 42}))
.unwrap();
gloo_timers::future::TimeoutFuture::new(100).await;
}
{
let mut store = WebStore::new(WebConfig {
write_key: "test-key".to_string(),
database_name: db_name.to_string(),
max_items: 1000,
max_fetch_size: 1024,
})
.await;
assert!(store.has_data(), "Data should be hydrated from IndexedDB");
if let Some(result) = store.fetch(None, None).unwrap() {
let batch: Value = result.data.unwrap();
let items = batch["batch"].as_array().unwrap();
assert_eq!(items.len(), 1);
assert_eq!(items[0]["event"], "persisted_event");
assert_eq!(items[0]["value"], 42);
if let Some(removable) = result.removable {
store.remove(&removable).unwrap();
}
}
}
}
#[wasm_bindgen_test]
async fn test_multiple_stores_isolated() {
let mut store_a = WebStore::new(WebConfig {
write_key: "key-a".to_string(),
database_name: "test-isolated-a".to_string(),
max_items: 1000,
max_fetch_size: 1024,
})
.await;
let mut store_b = WebStore::new(WebConfig {
write_key: "key-b".to_string(),
database_name: "test-isolated-b".to_string(),
max_items: 1000,
max_fetch_size: 1024,
})
.await;
store_a.append(json!({"store": "a"})).unwrap();
store_b.append(json!({"store": "b"})).unwrap();
if let Some(result) = store_a.fetch(None, None).unwrap() {
let batch: Value = result.data.unwrap();
let items = batch["batch"].as_array().unwrap();
assert_eq!(items.len(), 1);
assert_eq!(items[0]["store"], "a");
}
if let Some(result) = store_b.fetch(None, None).unwrap() {
let batch: Value = result.data.unwrap();
let items = batch["batch"].as_array().unwrap();
assert_eq!(items.len(), 1);
assert_eq!(items[0]["store"], "b");
}
}
#[wasm_bindgen_test]
#[should_panic(expected = "max_fetch_size < 100 bytes?")]
async fn test_rejects_tiny_max_fetch_size() {
let config = WebConfig {
write_key: "test-key".to_string(),
database_name: "test-panic".to_string(),
max_items: 1000,
max_fetch_size: 50,
};
let _store = WebStore::new(config).await;
}
#[wasm_bindgen_test]
#[should_panic(expected = "max_items = 0?")]
async fn test_rejects_zero_max_items() {
let config = WebConfig {
write_key: "test-key".to_string(),
database_name: "test-panic".to_string(),
max_items: 0,
max_fetch_size: 1024,
};
let _store = WebStore::new(config).await;
}
}