Skip to main content

cloudillo_types/
rtdb_adapter.rs

1//! Real-Time Database Adapter
2//!
3//! Trait and types for pluggable real-time database backends that store JSON documents
4//! using hierarchical path-based access (e.g., `posts/abc123/comments/xyz789`).
5//!
6//! Read operations (query, get, subscribe) work directly on the adapter.
7//! Write operations (create, update, delete) require a transaction for atomicity.
8//!
9//! Each adapter implementation provides its own constructor handling backend-specific
10//! initialization (database path, connection settings, etc.).
11
12use async_trait::async_trait;
13use futures_core::Stream;
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::pin::Pin;
19
20use crate::prelude::*;
21
22/// Lock mode for document locking.
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
24#[serde(rename_all = "camelCase")]
25pub enum LockMode {
26	Soft,
27	Hard,
28}
29
30/// Information about an active lock on a document path.
31#[derive(Debug, Clone)]
32pub struct LockInfo {
33	pub user_id: Box<str>,
34	pub mode: LockMode,
35	pub acquired_at: u64,
36	pub ttl_secs: u64,
37}
38
39/// An aggregation operation to compute per group.
40#[derive(Debug, Clone, Serialize, Deserialize)]
41#[serde(tag = "op", rename_all = "camelCase")]
42pub enum AggregateOp {
43	Sum { field: String },
44	Avg { field: String },
45	Min { field: String },
46	Max { field: String },
47}
48
49/// Aggregation options: group by a field and compute statistics.
50#[derive(Debug, Clone, Serialize, Deserialize)]
51#[serde(rename_all = "camelCase")]
52pub struct AggregateOptions {
53	/// Field to group by. For array fields, each element becomes a separate group.
54	pub group_by: String,
55
56	/// Additional operations per group (count is always included implicitly).
57	#[serde(default, skip_serializing_if = "Vec::is_empty")]
58	pub ops: Vec<AggregateOp>,
59}
60
61/// Query filter for selecting documents.
62///
63/// Supports multiple filter operations on JSON document fields.
64/// A document matches if ALL specified conditions are satisfied (AND logic).
65#[derive(Debug, Clone, Default, Serialize, Deserialize)]
66pub struct QueryFilter {
67	/// Field equality constraints: field_name -> expected_value
68	#[serde(default, skip_serializing_if = "HashMap::is_empty")]
69	pub equals: HashMap<String, Value>,
70
71	/// Field not-equal constraints: field_name -> expected_value
72	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "notEquals")]
73	pub not_equals: HashMap<String, Value>,
74
75	/// Field greater-than constraints: field_name -> threshold_value
76	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "greaterThan")]
77	pub greater_than: HashMap<String, Value>,
78
79	/// Field greater-than-or-equal constraints: field_name -> threshold_value
80	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "greaterThanOrEqual")]
81	pub greater_than_or_equal: HashMap<String, Value>,
82
83	/// Field less-than constraints: field_name -> threshold_value
84	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "lessThan")]
85	pub less_than: HashMap<String, Value>,
86
87	/// Field less-than-or-equal constraints: field_name -> threshold_value
88	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "lessThanOrEqual")]
89	pub less_than_or_equal: HashMap<String, Value>,
90
91	/// Field in-array constraints: field_name -> array of allowed values
92	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "inArray")]
93	pub in_array: HashMap<String, Vec<Value>>,
94
95	/// Array-contains constraints: field_name -> value that must be in the array field
96	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContains")]
97	pub array_contains: HashMap<String, Value>,
98
99	/// Not-in-array constraints: field_name -> array of excluded values
100	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "notInArray")]
101	pub not_in_array: HashMap<String, Vec<Value>>,
102
103	/// Array-contains-any constraints: field_name -> array of values (at least one must be in the array field)
104	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContainsAny")]
105	pub array_contains_any: HashMap<String, Vec<Value>>,
106
107	/// Array-contains-all constraints: field_name -> array of values (all must be in the array field)
108	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContainsAll")]
109	pub array_contains_all: HashMap<String, Vec<Value>>,
110}
111
112impl QueryFilter {
113	/// Create a new empty filter (matches all documents).
114	pub fn new() -> Self {
115		Self::default()
116	}
117
118	/// Create a filter with a single equality constraint.
119	pub fn equals_one(field: impl Into<String>, value: Value) -> Self {
120		let mut equals = HashMap::new();
121		equals.insert(field.into(), value);
122		Self { equals, ..Default::default() }
123	}
124
125	/// Add an equality constraint to this filter (builder pattern).
126	pub fn with_equals(mut self, field: impl Into<String>, value: Value) -> Self {
127		self.equals.insert(field.into(), value);
128		self
129	}
130
131	/// Add a not-equal constraint to this filter (builder pattern).
132	pub fn with_not_equals(mut self, field: impl Into<String>, value: Value) -> Self {
133		self.not_equals.insert(field.into(), value);
134		self
135	}
136
137	/// Add a greater-than constraint to this filter (builder pattern).
138	pub fn with_greater_than(mut self, field: impl Into<String>, value: Value) -> Self {
139		self.greater_than.insert(field.into(), value);
140		self
141	}
142
143	/// Add a greater-than-or-equal constraint to this filter (builder pattern).
144	pub fn with_greater_than_or_equal(mut self, field: impl Into<String>, value: Value) -> Self {
145		self.greater_than_or_equal.insert(field.into(), value);
146		self
147	}
148
149	/// Add a less-than constraint to this filter (builder pattern).
150	pub fn with_less_than(mut self, field: impl Into<String>, value: Value) -> Self {
151		self.less_than.insert(field.into(), value);
152		self
153	}
154
155	/// Add a less-than-or-equal constraint to this filter (builder pattern).
156	pub fn with_less_than_or_equal(mut self, field: impl Into<String>, value: Value) -> Self {
157		self.less_than_or_equal.insert(field.into(), value);
158		self
159	}
160
161	/// Add an in-array constraint to this filter (builder pattern).
162	pub fn with_in_array(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
163		self.in_array.insert(field.into(), values);
164		self
165	}
166
167	/// Add an array-contains constraint to this filter (builder pattern).
168	pub fn with_array_contains(mut self, field: impl Into<String>, value: Value) -> Self {
169		self.array_contains.insert(field.into(), value);
170		self
171	}
172
173	/// Add a not-in-array constraint to this filter (builder pattern).
174	pub fn with_not_in_array(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
175		self.not_in_array.insert(field.into(), values);
176		self
177	}
178
179	/// Add an array-contains-any constraint to this filter (builder pattern).
180	pub fn with_array_contains_any(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
181		self.array_contains_any.insert(field.into(), values);
182		self
183	}
184
185	/// Add an array-contains-all constraint to this filter (builder pattern).
186	pub fn with_array_contains_all(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
187		self.array_contains_all.insert(field.into(), values);
188		self
189	}
190
191	/// Check if a document matches this filter (all conditions must be satisfied).
192	pub fn matches(&self, doc: &Value) -> bool {
193		// Equality checks
194		for (field, expected) in &self.equals {
195			if doc.get(field) != Some(expected) {
196				return false;
197			}
198		}
199
200		// Not-equal checks (missing fields are inherently "not equal")
201		for (field, expected) in &self.not_equals {
202			if doc.get(field) == Some(expected) {
203				return false;
204			}
205		}
206
207		// Greater-than checks
208		for (field, threshold) in &self.greater_than {
209			match doc.get(field) {
210				Some(actual)
211					if compare_json_values(Some(actual), Some(threshold))
212						== std::cmp::Ordering::Greater => {}
213				_ => return false,
214			}
215		}
216
217		// Greater-than-or-equal checks
218		for (field, threshold) in &self.greater_than_or_equal {
219			match doc.get(field) {
220				Some(actual) => {
221					let ord = compare_json_values(Some(actual), Some(threshold));
222					if ord != std::cmp::Ordering::Greater && ord != std::cmp::Ordering::Equal {
223						return false;
224					}
225				}
226				_ => return false,
227			}
228		}
229
230		// Less-than checks
231		for (field, threshold) in &self.less_than {
232			match doc.get(field) {
233				Some(actual)
234					if compare_json_values(Some(actual), Some(threshold))
235						== std::cmp::Ordering::Less => {}
236				_ => return false,
237			}
238		}
239
240		// Less-than-or-equal checks
241		for (field, threshold) in &self.less_than_or_equal {
242			match doc.get(field) {
243				Some(actual) => {
244					let ord = compare_json_values(Some(actual), Some(threshold));
245					if ord != std::cmp::Ordering::Less && ord != std::cmp::Ordering::Equal {
246						return false;
247					}
248				}
249				_ => return false,
250			}
251		}
252
253		// In-array checks (field value must be in the provided array)
254		for (field, allowed_values) in &self.in_array {
255			match doc.get(field) {
256				Some(actual) if allowed_values.contains(actual) => {}
257				_ => return false,
258			}
259		}
260
261		// Array-contains checks (field must be an array containing the value)
262		for (field, required_value) in &self.array_contains {
263			match doc.get(field) {
264				Some(Value::Array(arr)) if arr.contains(required_value) => {}
265				_ => return false,
266			}
267		}
268
269		// Not-in-array checks (field value must NOT be in the provided array; missing fields pass)
270		for (field, excluded_values) in &self.not_in_array {
271			if let Some(actual) = doc.get(field) {
272				if excluded_values.contains(actual) {
273					return false;
274				}
275			}
276		}
277
278		// Array-contains-any checks
279		for (field, candidate_values) in &self.array_contains_any {
280			match doc.get(field) {
281				Some(Value::Array(arr)) if candidate_values.iter().any(|v| arr.contains(v)) => {}
282				_ => return false,
283			}
284		}
285
286		// Array-contains-all checks
287		for (field, required_values) in &self.array_contains_all {
288			match doc.get(field) {
289				Some(Value::Array(arr)) if required_values.iter().all(|v| arr.contains(v)) => {}
290				_ => return false,
291			}
292		}
293
294		true
295	}
296
297	/// Check if this filter is empty (matches all documents).
298	pub fn is_empty(&self) -> bool {
299		self.equals.is_empty()
300			&& self.not_equals.is_empty()
301			&& self.greater_than.is_empty()
302			&& self.greater_than_or_equal.is_empty()
303			&& self.less_than.is_empty()
304			&& self.less_than_or_equal.is_empty()
305			&& self.in_array.is_empty()
306			&& self.array_contains.is_empty()
307			&& self.not_in_array.is_empty()
308			&& self.array_contains_any.is_empty()
309			&& self.array_contains_all.is_empty()
310	}
311}
312
313/// Sort order for a field.
314#[derive(Debug, Clone, Serialize, Deserialize)]
315pub struct SortField {
316	/// Field name to sort by
317	pub field: String,
318
319	/// Sort direction: true for ascending, false for descending
320	pub ascending: bool,
321}
322
323impl SortField {
324	/// Create ascending sort order.
325	pub fn asc(field: impl Into<String>) -> Self {
326		Self { field: field.into(), ascending: true }
327	}
328
329	/// Create descending sort order.
330	pub fn desc(field: impl Into<String>) -> Self {
331		Self { field: field.into(), ascending: false }
332	}
333}
334
335/// Options for querying documents (filter, sort, limit, offset).
336#[derive(Debug, Clone, Default)]
337pub struct QueryOptions {
338	/// Optional filter to select documents
339	pub filter: Option<QueryFilter>,
340
341	/// Optional sort order (multiple fields supported)
342	pub sort: Option<Vec<SortField>>,
343
344	/// Optional limit on number of results
345	pub limit: Option<u32>,
346
347	/// Optional offset for pagination
348	pub offset: Option<u32>,
349
350	/// When set, returns aggregated groups instead of documents.
351	pub aggregate: Option<AggregateOptions>,
352}
353
354impl QueryOptions {
355	/// Create new empty query options (no filter, sort, or limit).
356	pub fn new() -> Self {
357		Self::default()
358	}
359
360	/// Set the filter.
361	pub fn with_filter(mut self, filter: QueryFilter) -> Self {
362		self.filter = Some(filter);
363		self
364	}
365
366	/// Set the sort order.
367	pub fn with_sort(mut self, sort: Vec<SortField>) -> Self {
368		self.sort = Some(sort);
369		self
370	}
371
372	/// Set the limit.
373	pub fn with_limit(mut self, limit: u32) -> Self {
374		self.limit = Some(limit);
375		self
376	}
377
378	/// Set the offset.
379	pub fn with_offset(mut self, offset: u32) -> Self {
380		self.offset = Some(offset);
381		self
382	}
383
384	/// Set the aggregation options.
385	pub fn with_aggregate(mut self, aggregate: AggregateOptions) -> Self {
386		self.aggregate = Some(aggregate);
387		self
388	}
389}
390
391/// Options for subscribing to real-time changes.
392#[derive(Debug, Clone)]
393pub struct SubscriptionOptions {
394	/// Path to subscribe to (e.g., "posts", "posts/abc123/comments")
395	pub path: Box<str>,
396
397	/// Optional filter (only matching changes are sent)
398	pub filter: Option<QueryFilter>,
399}
400
401impl SubscriptionOptions {
402	/// Create a subscription to all changes at a path.
403	pub fn all(path: impl Into<Box<str>>) -> Self {
404		Self { path: path.into(), filter: None }
405	}
406
407	/// Create a subscription with a filter.
408	pub fn filtered(path: impl Into<Box<str>>, filter: QueryFilter) -> Self {
409		Self { path: path.into(), filter: Some(filter) }
410	}
411}
412
413/// Real-time change event emitted when a document is created, updated, or deleted.
414#[derive(Debug, Clone, Serialize, Deserialize)]
415#[serde(tag = "action", rename_all = "camelCase")]
416pub enum ChangeEvent {
417	/// A new document was created
418	Create {
419		/// Full path to the document (e.g., "posts/abc123" or "posts/abc123/comments/xyz789")
420		path: Box<str>,
421		/// Full document data
422		data: Value,
423	},
424
425	/// An existing document was updated
426	Update {
427		/// Full path to the document
428		path: Box<str>,
429		/// Full updated document data
430		data: Value,
431		/// Previous document data (for incremental aggregate computation)
432		#[serde(default, skip_serializing_if = "Option::is_none")]
433		old_data: Option<Value>,
434	},
435
436	/// A document was deleted
437	Delete {
438		/// Full path to the document
439		path: Box<str>,
440		/// Document data before deletion (for incremental aggregate computation)
441		#[serde(default, skip_serializing_if = "Option::is_none")]
442		old_data: Option<Value>,
443	},
444
445	/// A lock was acquired on a document path
446	Lock {
447		/// Full path to the locked document
448		path: Box<str>,
449		/// Lock metadata (userId, mode)
450		data: Value,
451	},
452
453	/// A lock was released on a document path
454	Unlock {
455		/// Full path to the unlocked document
456		path: Box<str>,
457		/// Unlock metadata (userId)
458		data: Value,
459	},
460
461	/// Signals that all initial documents have been yielded for a subscription
462	Ready {
463		/// Subscription path
464		path: Box<str>,
465		/// Optional initial dataset
466		#[serde(default, skip_serializing_if = "Option::is_none")]
467		data: Option<Value>,
468	},
469}
470
471impl ChangeEvent {
472	/// Get the full path from this event.
473	pub fn path(&self) -> &str {
474		match self {
475			ChangeEvent::Create { path, .. }
476			| ChangeEvent::Update { path, .. }
477			| ChangeEvent::Delete { path, .. }
478			| ChangeEvent::Lock { path, .. }
479			| ChangeEvent::Unlock { path, .. }
480			| ChangeEvent::Ready { path, .. } => path,
481		}
482	}
483
484	/// Get the document ID (last segment of the path).
485	pub fn id(&self) -> Option<&str> {
486		self.path().split('/').next_back()
487	}
488
489	/// Get the parent path (all segments except the last).
490	pub fn parent_path(&self) -> Option<&str> {
491		let path = self.path();
492		path.rfind('/').map(|pos| &path[..pos])
493	}
494
495	/// Get the document data if this is a Create or Update event.
496	pub fn data(&self) -> Option<&Value> {
497		match self {
498			ChangeEvent::Create { data, .. }
499			| ChangeEvent::Update { data, .. }
500			| ChangeEvent::Lock { data, .. }
501			| ChangeEvent::Unlock { data, .. } => Some(data),
502			ChangeEvent::Delete { .. } => None,
503			ChangeEvent::Ready { data, .. } => data.as_ref(),
504		}
505	}
506
507	/// Check if this is a Create event.
508	pub fn is_create(&self) -> bool {
509		matches!(self, ChangeEvent::Create { .. })
510	}
511
512	/// Check if this is an Update event.
513	pub fn is_update(&self) -> bool {
514		matches!(self, ChangeEvent::Update { .. })
515	}
516
517	/// Check if this is a Delete event.
518	pub fn is_delete(&self) -> bool {
519		matches!(self, ChangeEvent::Delete { .. })
520	}
521}
522
523/// Compare two JSON values for ordering (used by filter range operators).
524fn compare_json_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
525	match (a, b) {
526		(None, None) => std::cmp::Ordering::Equal,
527		(None, Some(_)) => std::cmp::Ordering::Less,
528		(Some(_), None) => std::cmp::Ordering::Greater,
529		(Some(Value::Number(a)), Some(Value::Number(b))) => {
530			a.as_f64().partial_cmp(&b.as_f64()).unwrap_or(std::cmp::Ordering::Equal)
531		}
532		(Some(Value::String(a)), Some(Value::String(b))) => a.cmp(b),
533		(Some(Value::Bool(a)), Some(Value::Bool(b))) => a.cmp(b),
534		(Some(a), Some(b)) => a.to_string().cmp(&b.to_string()),
535	}
536}
537
538/// Convert a JSON value to a string key for aggregate group indexing.
539pub fn value_to_group_string(value: &Value) -> String {
540	match value {
541		Value::String(s) => s.clone(),
542		Value::Number(n) => n.to_string(),
543		Value::Bool(b) => b.to_string(),
544		Value::Null => "null".to_string(),
545		_ => serde_json::to_string(value).unwrap_or_default(),
546	}
547}
548
549/// Database statistics.
550#[derive(Debug, Clone, Serialize, Deserialize)]
551pub struct DbStats {
552	/// Total size of database files in bytes
553	pub size_bytes: u64,
554
555	/// Total number of documents across all tables
556	pub record_count: u64,
557
558	/// Number of tables in the database
559	pub table_count: u32,
560}
561
562/// Transaction for atomic write operations.
563///
564/// All write operations must be performed within a transaction to ensure atomicity.
565#[async_trait]
566pub trait Transaction: Send + Sync {
567	/// Create a new document with auto-generated ID. Returns the generated ID.
568	async fn create(&mut self, path: &str, data: Value) -> ClResult<Box<str>>;
569
570	/// Update an existing document (stores the provided data as-is).
571	///
572	/// Note: This method performs a full document replacement at the storage level.
573	/// Merge/PATCH semantics should be handled by the caller before invoking this method.
574	async fn update(&mut self, path: &str, data: Value) -> ClResult<()>;
575
576	/// Delete a document at a path.
577	async fn delete(&mut self, path: &str) -> ClResult<()>;
578
579	/// Read a document from the transaction's view.
580	///
581	/// This method provides transaction-local reads with "read-your-own-writes" semantics:
582	/// - Returns uncommitted changes made by this transaction
583	/// - Provides snapshot isolation from other concurrent transactions
584	/// - Essential for atomic operations like increment, append, etc.
585	///
586	/// # Returns
587	/// - `Ok(Some(value))` if document exists (either committed or written by this transaction)
588	/// - `Ok(None)` if document doesn't exist or was deleted by this transaction
589	/// - `Err` if read operation fails
590	async fn get(&self, path: &str) -> ClResult<Option<Value>>;
591
592	/// Commit the transaction, applying all changes atomically.
593	async fn commit(&mut self) -> ClResult<()>;
594
595	/// Rollback the transaction, discarding all changes.
596	async fn rollback(&mut self) -> ClResult<()>;
597}
598
599/// Real-Time Database Adapter trait.
600///
601/// Unified interface for database backends. Provides transaction-based writes,
602/// queries, and real-time subscriptions.
603#[async_trait]
604pub trait RtdbAdapter: Debug + Send + Sync {
605	/// Begin a new transaction for write operations.
606	async fn transaction(&self, tn_id: TnId, db_id: &str) -> ClResult<Box<dyn Transaction>>;
607
608	/// Close a database instance, flushing pending changes to disk.
609	async fn close_db(&self, tn_id: TnId, db_id: &str) -> ClResult<()>;
610
611	/// Query documents at a path with optional filtering, sorting, and pagination.
612	async fn query(
613		&self,
614		tn_id: TnId,
615		db_id: &str,
616		path: &str,
617		opts: QueryOptions,
618	) -> ClResult<Vec<Value>>;
619
620	/// Get a document at a specific path. Returns None if not found.
621	async fn get(&self, tn_id: TnId, db_id: &str, path: &str) -> ClResult<Option<Value>>;
622
623	/// Subscribe to real-time changes at a path. Returns a stream of ChangeEvents.
624	async fn subscribe(
625		&self,
626		tn_id: TnId,
627		db_id: &str,
628		opts: SubscriptionOptions,
629	) -> ClResult<Pin<Box<dyn Stream<Item = ChangeEvent> + Send>>>;
630
631	/// Create an index on a field to improve query performance.
632	async fn create_index(&self, tn_id: TnId, db_id: &str, path: &str, field: &str)
633		-> ClResult<()>;
634
635	/// Get database statistics (size, record count, table count).
636	async fn stats(&self, tn_id: TnId, db_id: &str) -> ClResult<DbStats>;
637
638	/// Export all documents from a database.
639	///
640	/// Returns all `(path, document)` pairs. The path is relative to the db_id
641	/// (e.g., `posts/abc123`). Used for duplicating RTDB files.
642	async fn export_all(&self, tn_id: TnId, db_id: &str) -> ClResult<Vec<(Box<str>, Value)>>;
643
644	/// Acquire a lock on a document path.
645	///
646	/// Returns `Ok(None)` if the lock was acquired successfully.
647	/// Returns `Ok(Some(LockInfo))` if the path is already locked by another user (denied).
648	async fn acquire_lock(
649		&self,
650		tn_id: TnId,
651		db_id: &str,
652		path: &str,
653		user_id: &str,
654		mode: LockMode,
655		conn_id: &str,
656	) -> ClResult<Option<LockInfo>>;
657
658	/// Release a lock on a document path.
659	async fn release_lock(
660		&self,
661		tn_id: TnId,
662		db_id: &str,
663		path: &str,
664		user_id: &str,
665		conn_id: &str,
666	) -> ClResult<()>;
667
668	/// Check if a path has an active lock. Returns the lock info if locked.
669	async fn check_lock(&self, tn_id: TnId, db_id: &str, path: &str) -> ClResult<Option<LockInfo>>;
670
671	/// Release all locks held by a specific user (called on disconnect).
672	async fn release_all_locks(
673		&self,
674		tn_id: TnId,
675		db_id: &str,
676		user_id: &str,
677		conn_id: &str,
678	) -> ClResult<()>;
679}
680
681// vim: ts=4