Skip to main content

cloudillo_types/
rtdb_adapter.rs

1//! Real-Time Database Adapter
2//!
3//! Trait and types for pluggable real-time database backends that store JSON documents
4//! using hierarchical path-based access (e.g., `posts/abc123/comments/xyz789`).
5//!
6//! Read operations (query, get, subscribe) work directly on the adapter.
7//! Write operations (create, update, delete) require a transaction for atomicity.
8//!
9//! Each adapter implementation provides its own constructor handling backend-specific
10//! initialization (database path, connection settings, etc.).
11
12use async_trait::async_trait;
13use futures_core::Stream;
14use serde::{Deserialize, Serialize};
15use serde_json::Value;
16use std::collections::HashMap;
17use std::fmt::Debug;
18use std::pin::Pin;
19
20use crate::prelude::*;
21use crate::types::TnId;
22
23/// Lock mode for document locking.
24#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
25#[serde(rename_all = "camelCase")]
26pub enum LockMode {
27	Soft,
28	Hard,
29}
30
31/// Information about an active lock on a document path.
32#[derive(Debug, Clone)]
33pub struct LockInfo {
34	pub user_id: Box<str>,
35	pub mode: LockMode,
36	pub acquired_at: u64,
37	pub ttl_secs: u64,
38}
39
40/// An aggregation operation to compute per group.
41#[derive(Debug, Clone, Serialize, Deserialize)]
42#[serde(tag = "op", rename_all = "camelCase")]
43pub enum AggregateOp {
44	Sum { field: String },
45	Avg { field: String },
46	Min { field: String },
47	Max { field: String },
48}
49
50/// Aggregation options: group by a field and compute statistics.
51#[derive(Debug, Clone, Serialize, Deserialize)]
52#[serde(rename_all = "camelCase")]
53pub struct AggregateOptions {
54	/// Field to group by. For array fields, each element becomes a separate group.
55	pub group_by: String,
56
57	/// Additional operations per group (count is always included implicitly).
58	#[serde(default, skip_serializing_if = "Vec::is_empty")]
59	pub ops: Vec<AggregateOp>,
60}
61
62/// Query filter for selecting documents.
63///
64/// Supports multiple filter operations on JSON document fields.
65/// A document matches if ALL specified conditions are satisfied (AND logic).
66#[derive(Debug, Clone, Default, Serialize, Deserialize)]
67pub struct QueryFilter {
68	/// Field equality constraints: field_name -> expected_value
69	#[serde(default, skip_serializing_if = "HashMap::is_empty")]
70	pub equals: HashMap<String, Value>,
71
72	/// Field not-equal constraints: field_name -> expected_value
73	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "notEquals")]
74	pub not_equals: HashMap<String, Value>,
75
76	/// Field greater-than constraints: field_name -> threshold_value
77	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "greaterThan")]
78	pub greater_than: HashMap<String, Value>,
79
80	/// Field greater-than-or-equal constraints: field_name -> threshold_value
81	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "greaterThanOrEqual")]
82	pub greater_than_or_equal: HashMap<String, Value>,
83
84	/// Field less-than constraints: field_name -> threshold_value
85	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "lessThan")]
86	pub less_than: HashMap<String, Value>,
87
88	/// Field less-than-or-equal constraints: field_name -> threshold_value
89	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "lessThanOrEqual")]
90	pub less_than_or_equal: HashMap<String, Value>,
91
92	/// Field in-array constraints: field_name -> array of allowed values
93	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "inArray")]
94	pub in_array: HashMap<String, Vec<Value>>,
95
96	/// Array-contains constraints: field_name -> value that must be in the array field
97	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContains")]
98	pub array_contains: HashMap<String, Value>,
99
100	/// Not-in-array constraints: field_name -> array of excluded values
101	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "notInArray")]
102	pub not_in_array: HashMap<String, Vec<Value>>,
103
104	/// Array-contains-any constraints: field_name -> array of values (at least one must be in the array field)
105	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContainsAny")]
106	pub array_contains_any: HashMap<String, Vec<Value>>,
107
108	/// Array-contains-all constraints: field_name -> array of values (all must be in the array field)
109	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContainsAll")]
110	pub array_contains_all: HashMap<String, Vec<Value>>,
111}
112
113impl QueryFilter {
114	/// Create a new empty filter (matches all documents).
115	pub fn new() -> Self {
116		Self::default()
117	}
118
119	/// Create a filter with a single equality constraint.
120	pub fn equals_one(field: impl Into<String>, value: Value) -> Self {
121		let mut equals = HashMap::new();
122		equals.insert(field.into(), value);
123		Self { equals, ..Default::default() }
124	}
125
126	/// Add an equality constraint to this filter (builder pattern).
127	pub fn with_equals(mut self, field: impl Into<String>, value: Value) -> Self {
128		self.equals.insert(field.into(), value);
129		self
130	}
131
132	/// Add a not-equal constraint to this filter (builder pattern).
133	pub fn with_not_equals(mut self, field: impl Into<String>, value: Value) -> Self {
134		self.not_equals.insert(field.into(), value);
135		self
136	}
137
138	/// Add a greater-than constraint to this filter (builder pattern).
139	pub fn with_greater_than(mut self, field: impl Into<String>, value: Value) -> Self {
140		self.greater_than.insert(field.into(), value);
141		self
142	}
143
144	/// Add a greater-than-or-equal constraint to this filter (builder pattern).
145	pub fn with_greater_than_or_equal(mut self, field: impl Into<String>, value: Value) -> Self {
146		self.greater_than_or_equal.insert(field.into(), value);
147		self
148	}
149
150	/// Add a less-than constraint to this filter (builder pattern).
151	pub fn with_less_than(mut self, field: impl Into<String>, value: Value) -> Self {
152		self.less_than.insert(field.into(), value);
153		self
154	}
155
156	/// Add a less-than-or-equal constraint to this filter (builder pattern).
157	pub fn with_less_than_or_equal(mut self, field: impl Into<String>, value: Value) -> Self {
158		self.less_than_or_equal.insert(field.into(), value);
159		self
160	}
161
162	/// Add an in-array constraint to this filter (builder pattern).
163	pub fn with_in_array(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
164		self.in_array.insert(field.into(), values);
165		self
166	}
167
168	/// Add an array-contains constraint to this filter (builder pattern).
169	pub fn with_array_contains(mut self, field: impl Into<String>, value: Value) -> Self {
170		self.array_contains.insert(field.into(), value);
171		self
172	}
173
174	/// Add a not-in-array constraint to this filter (builder pattern).
175	pub fn with_not_in_array(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
176		self.not_in_array.insert(field.into(), values);
177		self
178	}
179
180	/// Add an array-contains-any constraint to this filter (builder pattern).
181	pub fn with_array_contains_any(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
182		self.array_contains_any.insert(field.into(), values);
183		self
184	}
185
186	/// Add an array-contains-all constraint to this filter (builder pattern).
187	pub fn with_array_contains_all(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
188		self.array_contains_all.insert(field.into(), values);
189		self
190	}
191
192	/// Check if a document matches this filter (all conditions must be satisfied).
193	pub fn matches(&self, doc: &Value) -> bool {
194		// Equality checks
195		for (field, expected) in &self.equals {
196			match doc.get(field) {
197				Some(actual) if actual == expected => continue,
198				_ => return false,
199			}
200		}
201
202		// Not-equal checks (missing fields are inherently "not equal")
203		for (field, expected) in &self.not_equals {
204			match doc.get(field) {
205				Some(actual) if actual == expected => return false,
206				_ => continue,
207			}
208		}
209
210		// Greater-than checks
211		for (field, threshold) in &self.greater_than {
212			match doc.get(field) {
213				Some(actual)
214					if compare_json_values(Some(actual), Some(threshold))
215						== std::cmp::Ordering::Greater =>
216				{
217					continue
218				}
219				_ => return false,
220			}
221		}
222
223		// Greater-than-or-equal checks
224		for (field, threshold) in &self.greater_than_or_equal {
225			match doc.get(field) {
226				Some(actual) => {
227					let ord = compare_json_values(Some(actual), Some(threshold));
228					if ord == std::cmp::Ordering::Greater || ord == std::cmp::Ordering::Equal {
229						continue;
230					}
231					return false;
232				}
233				_ => return false,
234			}
235		}
236
237		// Less-than checks
238		for (field, threshold) in &self.less_than {
239			match doc.get(field) {
240				Some(actual)
241					if compare_json_values(Some(actual), Some(threshold))
242						== std::cmp::Ordering::Less =>
243				{
244					continue
245				}
246				_ => return false,
247			}
248		}
249
250		// Less-than-or-equal checks
251		for (field, threshold) in &self.less_than_or_equal {
252			match doc.get(field) {
253				Some(actual) => {
254					let ord = compare_json_values(Some(actual), Some(threshold));
255					if ord == std::cmp::Ordering::Less || ord == std::cmp::Ordering::Equal {
256						continue;
257					}
258					return false;
259				}
260				_ => return false,
261			}
262		}
263
264		// In-array checks (field value must be in the provided array)
265		for (field, allowed_values) in &self.in_array {
266			match doc.get(field) {
267				Some(actual) if allowed_values.contains(actual) => continue,
268				_ => return false,
269			}
270		}
271
272		// Array-contains checks (field must be an array containing the value)
273		for (field, required_value) in &self.array_contains {
274			match doc.get(field) {
275				Some(Value::Array(arr)) if arr.contains(required_value) => continue,
276				_ => return false,
277			}
278		}
279
280		// Not-in-array checks (field value must NOT be in the provided array; missing fields pass)
281		for (field, excluded_values) in &self.not_in_array {
282			match doc.get(field) {
283				Some(actual) if excluded_values.contains(actual) => return false,
284				_ => continue,
285			}
286		}
287
288		// Array-contains-any checks
289		for (field, candidate_values) in &self.array_contains_any {
290			match doc.get(field) {
291				Some(Value::Array(arr)) if candidate_values.iter().any(|v| arr.contains(v)) => {
292					continue
293				}
294				_ => return false,
295			}
296		}
297
298		// Array-contains-all checks
299		for (field, required_values) in &self.array_contains_all {
300			match doc.get(field) {
301				Some(Value::Array(arr)) if required_values.iter().all(|v| arr.contains(v)) => {
302					continue
303				}
304				_ => return false,
305			}
306		}
307
308		true
309	}
310
311	/// Check if this filter is empty (matches all documents).
312	pub fn is_empty(&self) -> bool {
313		self.equals.is_empty()
314			&& self.not_equals.is_empty()
315			&& self.greater_than.is_empty()
316			&& self.greater_than_or_equal.is_empty()
317			&& self.less_than.is_empty()
318			&& self.less_than_or_equal.is_empty()
319			&& self.in_array.is_empty()
320			&& self.array_contains.is_empty()
321			&& self.not_in_array.is_empty()
322			&& self.array_contains_any.is_empty()
323			&& self.array_contains_all.is_empty()
324	}
325}
326
327/// Sort order for a field.
328#[derive(Debug, Clone, Serialize, Deserialize)]
329pub struct SortField {
330	/// Field name to sort by
331	pub field: String,
332
333	/// Sort direction: true for ascending, false for descending
334	pub ascending: bool,
335}
336
337impl SortField {
338	/// Create ascending sort order.
339	pub fn asc(field: impl Into<String>) -> Self {
340		Self { field: field.into(), ascending: true }
341	}
342
343	/// Create descending sort order.
344	pub fn desc(field: impl Into<String>) -> Self {
345		Self { field: field.into(), ascending: false }
346	}
347}
348
349/// Options for querying documents (filter, sort, limit, offset).
350#[derive(Debug, Clone, Default)]
351pub struct QueryOptions {
352	/// Optional filter to select documents
353	pub filter: Option<QueryFilter>,
354
355	/// Optional sort order (multiple fields supported)
356	pub sort: Option<Vec<SortField>>,
357
358	/// Optional limit on number of results
359	pub limit: Option<u32>,
360
361	/// Optional offset for pagination
362	pub offset: Option<u32>,
363
364	/// When set, returns aggregated groups instead of documents.
365	pub aggregate: Option<AggregateOptions>,
366}
367
368impl QueryOptions {
369	/// Create new empty query options (no filter, sort, or limit).
370	pub fn new() -> Self {
371		Self::default()
372	}
373
374	/// Set the filter.
375	pub fn with_filter(mut self, filter: QueryFilter) -> Self {
376		self.filter = Some(filter);
377		self
378	}
379
380	/// Set the sort order.
381	pub fn with_sort(mut self, sort: Vec<SortField>) -> Self {
382		self.sort = Some(sort);
383		self
384	}
385
386	/// Set the limit.
387	pub fn with_limit(mut self, limit: u32) -> Self {
388		self.limit = Some(limit);
389		self
390	}
391
392	/// Set the offset.
393	pub fn with_offset(mut self, offset: u32) -> Self {
394		self.offset = Some(offset);
395		self
396	}
397
398	/// Set the aggregation options.
399	pub fn with_aggregate(mut self, aggregate: AggregateOptions) -> Self {
400		self.aggregate = Some(aggregate);
401		self
402	}
403}
404
405/// Options for subscribing to real-time changes.
406#[derive(Debug, Clone)]
407pub struct SubscriptionOptions {
408	/// Path to subscribe to (e.g., "posts", "posts/abc123/comments")
409	pub path: Box<str>,
410
411	/// Optional filter (only matching changes are sent)
412	pub filter: Option<QueryFilter>,
413}
414
415impl SubscriptionOptions {
416	/// Create a subscription to all changes at a path.
417	pub fn all(path: impl Into<Box<str>>) -> Self {
418		Self { path: path.into(), filter: None }
419	}
420
421	/// Create a subscription with a filter.
422	pub fn filtered(path: impl Into<Box<str>>, filter: QueryFilter) -> Self {
423		Self { path: path.into(), filter: Some(filter) }
424	}
425}
426
427/// Real-time change event emitted when a document is created, updated, or deleted.
428#[derive(Debug, Clone, Serialize, Deserialize)]
429#[serde(tag = "action", rename_all = "camelCase")]
430pub enum ChangeEvent {
431	/// A new document was created
432	Create {
433		/// Full path to the document (e.g., "posts/abc123" or "posts/abc123/comments/xyz789")
434		path: Box<str>,
435		/// Full document data
436		data: Value,
437	},
438
439	/// An existing document was updated
440	Update {
441		/// Full path to the document
442		path: Box<str>,
443		/// Full updated document data
444		data: Value,
445		/// Previous document data (for incremental aggregate computation)
446		#[serde(default, skip_serializing_if = "Option::is_none")]
447		old_data: Option<Value>,
448	},
449
450	/// A document was deleted
451	Delete {
452		/// Full path to the document
453		path: Box<str>,
454		/// Document data before deletion (for incremental aggregate computation)
455		#[serde(default, skip_serializing_if = "Option::is_none")]
456		old_data: Option<Value>,
457	},
458
459	/// A lock was acquired on a document path
460	Lock {
461		/// Full path to the locked document
462		path: Box<str>,
463		/// Lock metadata (userId, mode)
464		data: Value,
465	},
466
467	/// A lock was released on a document path
468	Unlock {
469		/// Full path to the unlocked document
470		path: Box<str>,
471		/// Unlock metadata (userId)
472		data: Value,
473	},
474
475	/// Signals that all initial documents have been yielded for a subscription
476	Ready {
477		/// Subscription path
478		path: Box<str>,
479		/// Optional initial dataset
480		#[serde(default, skip_serializing_if = "Option::is_none")]
481		data: Option<Value>,
482	},
483}
484
485impl ChangeEvent {
486	/// Get the full path from this event.
487	pub fn path(&self) -> &str {
488		match self {
489			ChangeEvent::Create { path, .. } => path,
490			ChangeEvent::Update { path, .. } => path,
491			ChangeEvent::Delete { path, .. } => path,
492			ChangeEvent::Lock { path, .. } => path,
493			ChangeEvent::Unlock { path, .. } => path,
494			ChangeEvent::Ready { path, .. } => path,
495		}
496	}
497
498	/// Get the document ID (last segment of the path).
499	pub fn id(&self) -> Option<&str> {
500		self.path().split('/').next_back()
501	}
502
503	/// Get the parent path (all segments except the last).
504	pub fn parent_path(&self) -> Option<&str> {
505		let path = self.path();
506		path.rfind('/').map(|pos| &path[..pos])
507	}
508
509	/// Get the document data if this is a Create or Update event.
510	pub fn data(&self) -> Option<&Value> {
511		match self {
512			ChangeEvent::Create { data, .. } | ChangeEvent::Update { data, .. } => Some(data),
513			ChangeEvent::Lock { data, .. } | ChangeEvent::Unlock { data, .. } => Some(data),
514			ChangeEvent::Delete { .. } => None,
515			ChangeEvent::Ready { data, .. } => data.as_ref(),
516		}
517	}
518
519	/// Check if this is a Create event.
520	pub fn is_create(&self) -> bool {
521		matches!(self, ChangeEvent::Create { .. })
522	}
523
524	/// Check if this is an Update event.
525	pub fn is_update(&self) -> bool {
526		matches!(self, ChangeEvent::Update { .. })
527	}
528
529	/// Check if this is a Delete event.
530	pub fn is_delete(&self) -> bool {
531		matches!(self, ChangeEvent::Delete { .. })
532	}
533}
534
535/// Compare two JSON values for ordering (used by filter range operators).
536fn compare_json_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
537	match (a, b) {
538		(None, None) => std::cmp::Ordering::Equal,
539		(None, Some(_)) => std::cmp::Ordering::Less,
540		(Some(_), None) => std::cmp::Ordering::Greater,
541		(Some(Value::Number(a)), Some(Value::Number(b))) => {
542			a.as_f64().partial_cmp(&b.as_f64()).unwrap_or(std::cmp::Ordering::Equal)
543		}
544		(Some(Value::String(a)), Some(Value::String(b))) => a.cmp(b),
545		(Some(Value::Bool(a)), Some(Value::Bool(b))) => a.cmp(b),
546		(Some(a), Some(b)) => a.to_string().cmp(&b.to_string()),
547	}
548}
549
550/// Convert a JSON value to a string key for aggregate group indexing.
551pub fn value_to_group_string(value: &Value) -> String {
552	match value {
553		Value::String(s) => s.clone(),
554		Value::Number(n) => n.to_string(),
555		Value::Bool(b) => b.to_string(),
556		Value::Null => "null".to_string(),
557		_ => serde_json::to_string(value).unwrap_or_default(),
558	}
559}
560
561/// Database statistics.
562#[derive(Debug, Clone, Serialize, Deserialize)]
563pub struct DbStats {
564	/// Total size of database files in bytes
565	pub size_bytes: u64,
566
567	/// Total number of documents across all tables
568	pub record_count: u64,
569
570	/// Number of tables in the database
571	pub table_count: u32,
572}
573
574/// Transaction for atomic write operations.
575///
576/// All write operations must be performed within a transaction to ensure atomicity.
577#[async_trait]
578pub trait Transaction: Send + Sync {
579	/// Create a new document with auto-generated ID. Returns the generated ID.
580	async fn create(&mut self, path: &str, data: Value) -> ClResult<Box<str>>;
581
582	/// Update an existing document (stores the provided data as-is).
583	///
584	/// Note: This method performs a full document replacement at the storage level.
585	/// Merge/PATCH semantics should be handled by the caller before invoking this method.
586	async fn update(&mut self, path: &str, data: Value) -> ClResult<()>;
587
588	/// Delete a document at a path.
589	async fn delete(&mut self, path: &str) -> ClResult<()>;
590
591	/// Read a document from the transaction's view.
592	///
593	/// This method provides transaction-local reads with "read-your-own-writes" semantics:
594	/// - Returns uncommitted changes made by this transaction
595	/// - Provides snapshot isolation from other concurrent transactions
596	/// - Essential for atomic operations like increment, append, etc.
597	///
598	/// # Returns
599	/// - `Ok(Some(value))` if document exists (either committed or written by this transaction)
600	/// - `Ok(None)` if document doesn't exist or was deleted by this transaction
601	/// - `Err` if read operation fails
602	async fn get(&self, path: &str) -> ClResult<Option<Value>>;
603
604	/// Commit the transaction, applying all changes atomically.
605	async fn commit(&mut self) -> ClResult<()>;
606
607	/// Rollback the transaction, discarding all changes.
608	async fn rollback(&mut self) -> ClResult<()>;
609}
610
611/// Real-Time Database Adapter trait.
612///
613/// Unified interface for database backends. Provides transaction-based writes,
614/// queries, and real-time subscriptions.
615#[async_trait]
616pub trait RtdbAdapter: Debug + Send + Sync {
617	/// Begin a new transaction for write operations.
618	async fn transaction(&self, tn_id: TnId, db_id: &str) -> ClResult<Box<dyn Transaction>>;
619
620	/// Close a database instance, flushing pending changes to disk.
621	async fn close_db(&self, tn_id: TnId, db_id: &str) -> ClResult<()>;
622
623	/// Query documents at a path with optional filtering, sorting, and pagination.
624	async fn query(
625		&self,
626		tn_id: TnId,
627		db_id: &str,
628		path: &str,
629		opts: QueryOptions,
630	) -> ClResult<Vec<Value>>;
631
632	/// Get a document at a specific path. Returns None if not found.
633	async fn get(&self, tn_id: TnId, db_id: &str, path: &str) -> ClResult<Option<Value>>;
634
635	/// Subscribe to real-time changes at a path. Returns a stream of ChangeEvents.
636	async fn subscribe(
637		&self,
638		tn_id: TnId,
639		db_id: &str,
640		opts: SubscriptionOptions,
641	) -> ClResult<Pin<Box<dyn Stream<Item = ChangeEvent> + Send>>>;
642
643	/// Create an index on a field to improve query performance.
644	async fn create_index(&self, tn_id: TnId, db_id: &str, path: &str, field: &str)
645		-> ClResult<()>;
646
647	/// Get database statistics (size, record count, table count).
648	async fn stats(&self, tn_id: TnId, db_id: &str) -> ClResult<DbStats>;
649
650	/// Export all documents from a database.
651	///
652	/// Returns all `(path, document)` pairs. The path is relative to the db_id
653	/// (e.g., `posts/abc123`). Used for duplicating RTDB files.
654	async fn export_all(&self, tn_id: TnId, db_id: &str) -> ClResult<Vec<(Box<str>, Value)>>;
655
656	/// Acquire a lock on a document path.
657	///
658	/// Returns `Ok(None)` if the lock was acquired successfully.
659	/// Returns `Ok(Some(LockInfo))` if the path is already locked by another user (denied).
660	async fn acquire_lock(
661		&self,
662		tn_id: TnId,
663		db_id: &str,
664		path: &str,
665		user_id: &str,
666		mode: LockMode,
667		conn_id: &str,
668	) -> ClResult<Option<LockInfo>>;
669
670	/// Release a lock on a document path.
671	async fn release_lock(
672		&self,
673		tn_id: TnId,
674		db_id: &str,
675		path: &str,
676		user_id: &str,
677		conn_id: &str,
678	) -> ClResult<()>;
679
680	/// Check if a path has an active lock. Returns the lock info if locked.
681	async fn check_lock(&self, tn_id: TnId, db_id: &str, path: &str) -> ClResult<Option<LockInfo>>;
682
683	/// Release all locks held by a specific user (called on disconnect).
684	async fn release_all_locks(
685		&self,
686		tn_id: TnId,
687		db_id: &str,
688		user_id: &str,
689		conn_id: &str,
690	) -> ClResult<()>;
691}
692
693// vim: ts=4