Skip to main content

cloudillo_types/
rtdb_adapter.rs

1// SPDX-FileCopyrightText: Szilárd Hajba
2// SPDX-License-Identifier: LGPL-3.0-or-later
3
4//! Real-Time Database Adapter
5//!
6//! Trait and types for pluggable real-time database backends that store JSON documents
7//! using hierarchical path-based access (e.g., `posts/abc123/comments/xyz789`).
8//!
9//! Read operations (query, get, subscribe) work directly on the adapter.
10//! Write operations (create, update, delete) require a transaction for atomicity.
11//!
12//! Each adapter implementation provides its own constructor handling backend-specific
13//! initialization (database path, connection settings, etc.).
14
15use async_trait::async_trait;
16use futures_core::Stream;
17use serde::{Deserialize, Serialize};
18use serde_json::Value;
19use std::collections::HashMap;
20use std::fmt::Debug;
21use std::pin::Pin;
22
23use crate::prelude::*;
24
25/// Lock mode for document locking.
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27#[serde(rename_all = "camelCase")]
28pub enum LockMode {
29	Soft,
30	Hard,
31}
32
33/// Information about an active lock on a document path.
34#[derive(Debug, Clone)]
35pub struct LockInfo {
36	pub user_id: Box<str>,
37	pub mode: LockMode,
38	pub acquired_at: u64,
39	pub ttl_secs: u64,
40}
41
42/// An aggregation operation to compute per group.
43#[derive(Debug, Clone, Serialize, Deserialize)]
44#[serde(tag = "op", rename_all = "camelCase")]
45pub enum AggregateOp {
46	Sum { field: String },
47	Avg { field: String },
48	Min { field: String },
49	Max { field: String },
50}
51
52/// Aggregation options: group by a field and compute statistics.
53#[derive(Debug, Clone, Serialize, Deserialize)]
54#[serde(rename_all = "camelCase")]
55pub struct AggregateOptions {
56	/// Field to group by. For array fields, each element becomes a separate group.
57	pub group_by: String,
58
59	/// Additional operations per group (count is always included implicitly).
60	#[serde(default, skip_serializing_if = "Vec::is_empty")]
61	pub ops: Vec<AggregateOp>,
62}
63
64/// Query filter for selecting documents.
65///
66/// Supports multiple filter operations on JSON document fields.
67/// A document matches if ALL specified conditions are satisfied (AND logic).
68#[derive(Debug, Clone, Default, Serialize, Deserialize)]
69pub struct QueryFilter {
70	/// Field equality constraints: field_name -> expected_value
71	#[serde(default, skip_serializing_if = "HashMap::is_empty")]
72	pub equals: HashMap<String, Value>,
73
74	/// Field not-equal constraints: field_name -> expected_value
75	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "notEquals")]
76	pub not_equals: HashMap<String, Value>,
77
78	/// Field greater-than constraints: field_name -> threshold_value
79	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "greaterThan")]
80	pub greater_than: HashMap<String, Value>,
81
82	/// Field greater-than-or-equal constraints: field_name -> threshold_value
83	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "greaterThanOrEqual")]
84	pub greater_than_or_equal: HashMap<String, Value>,
85
86	/// Field less-than constraints: field_name -> threshold_value
87	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "lessThan")]
88	pub less_than: HashMap<String, Value>,
89
90	/// Field less-than-or-equal constraints: field_name -> threshold_value
91	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "lessThanOrEqual")]
92	pub less_than_or_equal: HashMap<String, Value>,
93
94	/// Field in-array constraints: field_name -> array of allowed values
95	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "inArray")]
96	pub in_array: HashMap<String, Vec<Value>>,
97
98	/// Array-contains constraints: field_name -> value that must be in the array field
99	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContains")]
100	pub array_contains: HashMap<String, Value>,
101
102	/// Not-in-array constraints: field_name -> array of excluded values
103	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "notInArray")]
104	pub not_in_array: HashMap<String, Vec<Value>>,
105
106	/// Array-contains-any constraints: field_name -> array of values (at least one must be in the array field)
107	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContainsAny")]
108	pub array_contains_any: HashMap<String, Vec<Value>>,
109
110	/// Array-contains-all constraints: field_name -> array of values (all must be in the array field)
111	#[serde(default, skip_serializing_if = "HashMap::is_empty", rename = "arrayContainsAll")]
112	pub array_contains_all: HashMap<String, Vec<Value>>,
113}
114
115impl QueryFilter {
116	/// Create a new empty filter (matches all documents).
117	pub fn new() -> Self {
118		Self::default()
119	}
120
121	/// Create a filter with a single equality constraint.
122	pub fn equals_one(field: impl Into<String>, value: Value) -> Self {
123		let mut equals = HashMap::new();
124		equals.insert(field.into(), value);
125		Self { equals, ..Default::default() }
126	}
127
128	/// Add an equality constraint to this filter (builder pattern).
129	pub fn with_equals(mut self, field: impl Into<String>, value: Value) -> Self {
130		self.equals.insert(field.into(), value);
131		self
132	}
133
134	/// Add a not-equal constraint to this filter (builder pattern).
135	pub fn with_not_equals(mut self, field: impl Into<String>, value: Value) -> Self {
136		self.not_equals.insert(field.into(), value);
137		self
138	}
139
140	/// Add a greater-than constraint to this filter (builder pattern).
141	pub fn with_greater_than(mut self, field: impl Into<String>, value: Value) -> Self {
142		self.greater_than.insert(field.into(), value);
143		self
144	}
145
146	/// Add a greater-than-or-equal constraint to this filter (builder pattern).
147	pub fn with_greater_than_or_equal(mut self, field: impl Into<String>, value: Value) -> Self {
148		self.greater_than_or_equal.insert(field.into(), value);
149		self
150	}
151
152	/// Add a less-than constraint to this filter (builder pattern).
153	pub fn with_less_than(mut self, field: impl Into<String>, value: Value) -> Self {
154		self.less_than.insert(field.into(), value);
155		self
156	}
157
158	/// Add a less-than-or-equal constraint to this filter (builder pattern).
159	pub fn with_less_than_or_equal(mut self, field: impl Into<String>, value: Value) -> Self {
160		self.less_than_or_equal.insert(field.into(), value);
161		self
162	}
163
164	/// Add an in-array constraint to this filter (builder pattern).
165	pub fn with_in_array(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
166		self.in_array.insert(field.into(), values);
167		self
168	}
169
170	/// Add an array-contains constraint to this filter (builder pattern).
171	pub fn with_array_contains(mut self, field: impl Into<String>, value: Value) -> Self {
172		self.array_contains.insert(field.into(), value);
173		self
174	}
175
176	/// Add a not-in-array constraint to this filter (builder pattern).
177	pub fn with_not_in_array(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
178		self.not_in_array.insert(field.into(), values);
179		self
180	}
181
182	/// Add an array-contains-any constraint to this filter (builder pattern).
183	pub fn with_array_contains_any(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
184		self.array_contains_any.insert(field.into(), values);
185		self
186	}
187
188	/// Add an array-contains-all constraint to this filter (builder pattern).
189	pub fn with_array_contains_all(mut self, field: impl Into<String>, values: Vec<Value>) -> Self {
190		self.array_contains_all.insert(field.into(), values);
191		self
192	}
193
194	/// Check if a document matches this filter (all conditions must be satisfied).
195	pub fn matches(&self, doc: &Value) -> bool {
196		// Equality checks
197		for (field, expected) in &self.equals {
198			if doc.get(field) != Some(expected) {
199				return false;
200			}
201		}
202
203		// Not-equal checks (missing fields are inherently "not equal")
204		for (field, expected) in &self.not_equals {
205			if doc.get(field) == Some(expected) {
206				return false;
207			}
208		}
209
210		// Greater-than checks
211		for (field, threshold) in &self.greater_than {
212			match doc.get(field) {
213				Some(actual)
214					if compare_json_values(Some(actual), Some(threshold))
215						== std::cmp::Ordering::Greater => {}
216				_ => return false,
217			}
218		}
219
220		// Greater-than-or-equal checks
221		for (field, threshold) in &self.greater_than_or_equal {
222			match doc.get(field) {
223				Some(actual) => {
224					let ord = compare_json_values(Some(actual), Some(threshold));
225					if ord != std::cmp::Ordering::Greater && ord != std::cmp::Ordering::Equal {
226						return false;
227					}
228				}
229				_ => return false,
230			}
231		}
232
233		// Less-than checks
234		for (field, threshold) in &self.less_than {
235			match doc.get(field) {
236				Some(actual)
237					if compare_json_values(Some(actual), Some(threshold))
238						== std::cmp::Ordering::Less => {}
239				_ => return false,
240			}
241		}
242
243		// Less-than-or-equal checks
244		for (field, threshold) in &self.less_than_or_equal {
245			match doc.get(field) {
246				Some(actual) => {
247					let ord = compare_json_values(Some(actual), Some(threshold));
248					if ord != std::cmp::Ordering::Less && ord != std::cmp::Ordering::Equal {
249						return false;
250					}
251				}
252				_ => return false,
253			}
254		}
255
256		// In-array checks (field value must be in the provided array)
257		for (field, allowed_values) in &self.in_array {
258			match doc.get(field) {
259				Some(actual) if allowed_values.contains(actual) => {}
260				_ => return false,
261			}
262		}
263
264		// Array-contains checks (field must be an array containing the value)
265		for (field, required_value) in &self.array_contains {
266			match doc.get(field) {
267				Some(Value::Array(arr)) if arr.contains(required_value) => {}
268				_ => return false,
269			}
270		}
271
272		// Not-in-array checks (field value must NOT be in the provided array; missing fields pass)
273		for (field, excluded_values) in &self.not_in_array {
274			if let Some(actual) = doc.get(field) {
275				if excluded_values.contains(actual) {
276					return false;
277				}
278			}
279		}
280
281		// Array-contains-any checks
282		for (field, candidate_values) in &self.array_contains_any {
283			match doc.get(field) {
284				Some(Value::Array(arr)) if candidate_values.iter().any(|v| arr.contains(v)) => {}
285				_ => return false,
286			}
287		}
288
289		// Array-contains-all checks
290		for (field, required_values) in &self.array_contains_all {
291			match doc.get(field) {
292				Some(Value::Array(arr)) if required_values.iter().all(|v| arr.contains(v)) => {}
293				_ => return false,
294			}
295		}
296
297		true
298	}
299
300	/// Check if this filter is empty (matches all documents).
301	pub fn is_empty(&self) -> bool {
302		self.equals.is_empty()
303			&& self.not_equals.is_empty()
304			&& self.greater_than.is_empty()
305			&& self.greater_than_or_equal.is_empty()
306			&& self.less_than.is_empty()
307			&& self.less_than_or_equal.is_empty()
308			&& self.in_array.is_empty()
309			&& self.array_contains.is_empty()
310			&& self.not_in_array.is_empty()
311			&& self.array_contains_any.is_empty()
312			&& self.array_contains_all.is_empty()
313	}
314}
315
316/// Sort order for a field.
317#[derive(Debug, Clone, Serialize, Deserialize)]
318pub struct SortField {
319	/// Field name to sort by
320	pub field: String,
321
322	/// Sort direction: true for ascending, false for descending
323	pub ascending: bool,
324}
325
326impl SortField {
327	/// Create ascending sort order.
328	pub fn asc(field: impl Into<String>) -> Self {
329		Self { field: field.into(), ascending: true }
330	}
331
332	/// Create descending sort order.
333	pub fn desc(field: impl Into<String>) -> Self {
334		Self { field: field.into(), ascending: false }
335	}
336}
337
338/// Options for querying documents (filter, sort, limit, offset).
339#[derive(Debug, Clone, Default)]
340pub struct QueryOptions {
341	/// Optional filter to select documents
342	pub filter: Option<QueryFilter>,
343
344	/// Optional sort order (multiple fields supported)
345	pub sort: Option<Vec<SortField>>,
346
347	/// Optional limit on number of results
348	pub limit: Option<u32>,
349
350	/// Optional offset for pagination
351	pub offset: Option<u32>,
352
353	/// When set, returns aggregated groups instead of documents.
354	pub aggregate: Option<AggregateOptions>,
355}
356
357impl QueryOptions {
358	/// Create new empty query options (no filter, sort, or limit).
359	pub fn new() -> Self {
360		Self::default()
361	}
362
363	/// Set the filter.
364	pub fn with_filter(mut self, filter: QueryFilter) -> Self {
365		self.filter = Some(filter);
366		self
367	}
368
369	/// Set the sort order.
370	pub fn with_sort(mut self, sort: Vec<SortField>) -> Self {
371		self.sort = Some(sort);
372		self
373	}
374
375	/// Set the limit.
376	pub fn with_limit(mut self, limit: u32) -> Self {
377		self.limit = Some(limit);
378		self
379	}
380
381	/// Set the offset.
382	pub fn with_offset(mut self, offset: u32) -> Self {
383		self.offset = Some(offset);
384		self
385	}
386
387	/// Set the aggregation options.
388	pub fn with_aggregate(mut self, aggregate: AggregateOptions) -> Self {
389		self.aggregate = Some(aggregate);
390		self
391	}
392}
393
394/// Options for subscribing to real-time changes.
395#[derive(Debug, Clone)]
396pub struct SubscriptionOptions {
397	/// Path to subscribe to (e.g., "posts", "posts/abc123/comments")
398	pub path: Box<str>,
399
400	/// Optional filter (only matching changes are sent)
401	pub filter: Option<QueryFilter>,
402}
403
404impl SubscriptionOptions {
405	/// Create a subscription to all changes at a path.
406	pub fn all(path: impl Into<Box<str>>) -> Self {
407		Self { path: path.into(), filter: None }
408	}
409
410	/// Create a subscription with a filter.
411	pub fn filtered(path: impl Into<Box<str>>, filter: QueryFilter) -> Self {
412		Self { path: path.into(), filter: Some(filter) }
413	}
414}
415
416/// Real-time change event emitted when a document is created, updated, or deleted.
417#[derive(Debug, Clone, Serialize, Deserialize)]
418#[serde(tag = "action", rename_all = "camelCase")]
419pub enum ChangeEvent {
420	/// A new document was created
421	Create {
422		/// Full path to the document (e.g., "posts/abc123" or "posts/abc123/comments/xyz789")
423		path: Box<str>,
424		/// Full document data
425		data: Value,
426	},
427
428	/// An existing document was updated
429	Update {
430		/// Full path to the document
431		path: Box<str>,
432		/// Full updated document data
433		data: Value,
434		/// Previous document data (for incremental aggregate computation)
435		#[serde(default, skip_serializing_if = "Option::is_none")]
436		old_data: Option<Value>,
437	},
438
439	/// A document was deleted
440	Delete {
441		/// Full path to the document
442		path: Box<str>,
443		/// Document data before deletion (for incremental aggregate computation)
444		#[serde(default, skip_serializing_if = "Option::is_none")]
445		old_data: Option<Value>,
446	},
447
448	/// A lock was acquired on a document path
449	Lock {
450		/// Full path to the locked document
451		path: Box<str>,
452		/// Lock metadata (userId, mode)
453		data: Value,
454	},
455
456	/// A lock was released on a document path
457	Unlock {
458		/// Full path to the unlocked document
459		path: Box<str>,
460		/// Unlock metadata (userId)
461		data: Value,
462	},
463
464	/// Signals that all initial documents have been yielded for a subscription
465	Ready {
466		/// Subscription path
467		path: Box<str>,
468		/// Optional initial dataset
469		#[serde(default, skip_serializing_if = "Option::is_none")]
470		data: Option<Value>,
471	},
472}
473
474impl ChangeEvent {
475	/// Get the full path from this event.
476	pub fn path(&self) -> &str {
477		match self {
478			ChangeEvent::Create { path, .. }
479			| ChangeEvent::Update { path, .. }
480			| ChangeEvent::Delete { path, .. }
481			| ChangeEvent::Lock { path, .. }
482			| ChangeEvent::Unlock { path, .. }
483			| ChangeEvent::Ready { path, .. } => path,
484		}
485	}
486
487	/// Get the document ID (last segment of the path).
488	pub fn id(&self) -> Option<&str> {
489		self.path().split('/').next_back()
490	}
491
492	/// Get the parent path (all segments except the last).
493	pub fn parent_path(&self) -> Option<&str> {
494		let path = self.path();
495		path.rfind('/').map(|pos| &path[..pos])
496	}
497
498	/// Get the document data if this is a Create or Update event.
499	pub fn data(&self) -> Option<&Value> {
500		match self {
501			ChangeEvent::Create { data, .. }
502			| ChangeEvent::Update { data, .. }
503			| ChangeEvent::Lock { data, .. }
504			| ChangeEvent::Unlock { data, .. } => Some(data),
505			ChangeEvent::Delete { .. } => None,
506			ChangeEvent::Ready { data, .. } => data.as_ref(),
507		}
508	}
509
510	/// Check if this is a Create event.
511	pub fn is_create(&self) -> bool {
512		matches!(self, ChangeEvent::Create { .. })
513	}
514
515	/// Check if this is an Update event.
516	pub fn is_update(&self) -> bool {
517		matches!(self, ChangeEvent::Update { .. })
518	}
519
520	/// Check if this is a Delete event.
521	pub fn is_delete(&self) -> bool {
522		matches!(self, ChangeEvent::Delete { .. })
523	}
524}
525
526/// Compare two JSON values for ordering (used by filter range operators).
527fn compare_json_values(a: Option<&Value>, b: Option<&Value>) -> std::cmp::Ordering {
528	match (a, b) {
529		(None, None) => std::cmp::Ordering::Equal,
530		(None, Some(_)) => std::cmp::Ordering::Less,
531		(Some(_), None) => std::cmp::Ordering::Greater,
532		(Some(Value::Number(a)), Some(Value::Number(b))) => {
533			a.as_f64().partial_cmp(&b.as_f64()).unwrap_or(std::cmp::Ordering::Equal)
534		}
535		(Some(Value::String(a)), Some(Value::String(b))) => a.cmp(b),
536		(Some(Value::Bool(a)), Some(Value::Bool(b))) => a.cmp(b),
537		(Some(a), Some(b)) => a.to_string().cmp(&b.to_string()),
538	}
539}
540
541/// Convert a JSON value to a string key for aggregate group indexing.
542pub fn value_to_group_string(value: &Value) -> String {
543	match value {
544		Value::String(s) => s.clone(),
545		Value::Number(n) => n.to_string(),
546		Value::Bool(b) => b.to_string(),
547		Value::Null => "null".to_string(),
548		_ => serde_json::to_string(value).unwrap_or_default(),
549	}
550}
551
552/// Database statistics.
553#[derive(Debug, Clone, Serialize, Deserialize)]
554pub struct DbStats {
555	/// Total size of database files in bytes
556	pub size_bytes: u64,
557
558	/// Total number of documents across all tables
559	pub record_count: u64,
560
561	/// Number of tables in the database
562	pub table_count: u32,
563}
564
565/// Transaction for atomic write operations.
566///
567/// All write operations must be performed within a transaction to ensure atomicity.
568#[async_trait]
569pub trait Transaction: Send + Sync {
570	/// Create a new document with auto-generated ID. Returns the generated ID.
571	async fn create(&mut self, path: &str, data: Value) -> ClResult<Box<str>>;
572
573	/// Update an existing document (stores the provided data as-is).
574	///
575	/// Note: This method performs a full document replacement at the storage level.
576	/// Merge/PATCH semantics should be handled by the caller before invoking this method.
577	async fn update(&mut self, path: &str, data: Value) -> ClResult<()>;
578
579	/// Delete a document at a path.
580	async fn delete(&mut self, path: &str) -> ClResult<()>;
581
582	/// Read a document from the transaction's view.
583	///
584	/// This method provides transaction-local reads with "read-your-own-writes" semantics:
585	/// - Returns uncommitted changes made by this transaction
586	/// - Provides snapshot isolation from other concurrent transactions
587	/// - Essential for atomic operations like increment, append, etc.
588	///
589	/// # Returns
590	/// - `Ok(Some(value))` if document exists (either committed or written by this transaction)
591	/// - `Ok(None)` if document doesn't exist or was deleted by this transaction
592	/// - `Err` if read operation fails
593	async fn get(&self, path: &str) -> ClResult<Option<Value>>;
594
595	/// Commit the transaction, applying all changes atomically.
596	async fn commit(&mut self) -> ClResult<()>;
597
598	/// Rollback the transaction, discarding all changes.
599	async fn rollback(&mut self) -> ClResult<()>;
600}
601
602/// Real-Time Database Adapter trait.
603///
604/// Unified interface for database backends. Provides transaction-based writes,
605/// queries, and real-time subscriptions.
606#[async_trait]
607pub trait RtdbAdapter: Debug + Send + Sync {
608	/// Begin a new transaction for write operations.
609	async fn transaction(&self, tn_id: TnId, db_id: &str) -> ClResult<Box<dyn Transaction>>;
610
611	/// Close a database instance, flushing pending changes to disk.
612	async fn close_db(&self, tn_id: TnId, db_id: &str) -> ClResult<()>;
613
614	/// Query documents at a path with optional filtering, sorting, and pagination.
615	async fn query(
616		&self,
617		tn_id: TnId,
618		db_id: &str,
619		path: &str,
620		opts: QueryOptions,
621	) -> ClResult<Vec<Value>>;
622
623	/// Get a document at a specific path. Returns None if not found.
624	async fn get(&self, tn_id: TnId, db_id: &str, path: &str) -> ClResult<Option<Value>>;
625
626	/// Subscribe to real-time changes at a path. Returns a stream of ChangeEvents.
627	async fn subscribe(
628		&self,
629		tn_id: TnId,
630		db_id: &str,
631		opts: SubscriptionOptions,
632	) -> ClResult<Pin<Box<dyn Stream<Item = ChangeEvent> + Send>>>;
633
634	/// Create an index on a field to improve query performance.
635	async fn create_index(&self, tn_id: TnId, db_id: &str, path: &str, field: &str)
636		-> ClResult<()>;
637
638	/// Get database statistics (size, record count, table count).
639	async fn stats(&self, tn_id: TnId, db_id: &str) -> ClResult<DbStats>;
640
641	/// Export all documents from a database.
642	///
643	/// Returns all `(path, document)` pairs. The path is relative to the db_id
644	/// (e.g., `posts/abc123`). Used for duplicating RTDB files.
645	async fn export_all(&self, tn_id: TnId, db_id: &str) -> ClResult<Vec<(Box<str>, Value)>>;
646
647	/// Acquire a lock on a document path.
648	///
649	/// Returns `Ok(None)` if the lock was acquired successfully.
650	/// Returns `Ok(Some(LockInfo))` if the path is already locked by another user (denied).
651	async fn acquire_lock(
652		&self,
653		tn_id: TnId,
654		db_id: &str,
655		path: &str,
656		user_id: &str,
657		mode: LockMode,
658		conn_id: &str,
659	) -> ClResult<Option<LockInfo>>;
660
661	/// Release a lock on a document path.
662	async fn release_lock(
663		&self,
664		tn_id: TnId,
665		db_id: &str,
666		path: &str,
667		user_id: &str,
668		conn_id: &str,
669	) -> ClResult<()>;
670
671	/// Check if a path has an active lock. Returns the lock info if locked.
672	async fn check_lock(&self, tn_id: TnId, db_id: &str, path: &str) -> ClResult<Option<LockInfo>>;
673
674	/// Release all locks held by a specific user (called on disconnect).
675	async fn release_all_locks(
676		&self,
677		tn_id: TnId,
678		db_id: &str,
679		user_id: &str,
680		conn_id: &str,
681	) -> ClResult<()>;
682}
683
684// vim: ts=4