nodedb_crdt/state/
core.rs1use std::collections::HashSet;
6
7use loro::{LoroDoc, LoroMap, LoroValue, ValueOrContainer};
8
9use crate::error::{CrdtError, Result};
10use crate::validator::bitemporal::{VALID_UNTIL, VALID_UNTIL_OPEN};
11
12fn row_is_live(row: &LoroMap) -> bool {
17 match row.get(VALID_UNTIL) {
18 None => true,
19 Some(ValueOrContainer::Value(LoroValue::Null)) => true,
20 Some(ValueOrContainer::Value(LoroValue::I64(n))) => n == VALID_UNTIL_OPEN,
21 _ => true,
22 }
23}
24
25pub struct CrdtState {
27 pub(super) doc: LoroDoc,
28 pub(super) peer_id: u64,
29 array_surrogate_ids: HashSet<String>,
35}
36
37impl CrdtState {
38 pub fn new(peer_id: u64) -> Result<Self> {
40 let doc = LoroDoc::new();
41 doc.set_peer_id(peer_id)
42 .map_err(|e| CrdtError::Loro(format!("failed to set peer_id {peer_id}: {e}")))?;
43 Ok(Self {
44 doc,
45 peer_id,
46 array_surrogate_ids: HashSet::new(),
47 })
48 }
49
50 pub fn register_array_surrogate(&mut self, id: String) {
58 self.array_surrogate_ids.insert(id);
59 }
60
61 pub fn upsert(
63 &self,
64 collection: &str,
65 row_id: &str,
66 fields: &[(&str, LoroValue)],
67 ) -> Result<()> {
68 let coll = self.doc.get_map(collection);
69 let row_container = coll
70 .insert_container(row_id, LoroMap::new())
71 .map_err(|e| CrdtError::Loro(e.to_string()))?;
72 for (field, value) in fields {
73 row_container
74 .insert(field, value.clone())
75 .map_err(|e| CrdtError::Loro(e.to_string()))?;
76 }
77 Ok(())
78 }
79
80 pub fn delete(&self, collection: &str, row_id: &str) -> Result<()> {
82 let coll = self.doc.get_map(collection);
83 coll.delete(row_id)
84 .map_err(|e| CrdtError::Loro(e.to_string()))?;
85 Ok(())
86 }
87
88 pub fn clear_collection(&self, collection: &str) -> Result<usize> {
90 let coll = self.doc.get_map(collection);
91 let keys: Vec<String> = coll.keys().map(|k| k.to_string()).collect();
92 let count = keys.len();
93 for key in &keys {
94 coll.delete(key)
95 .map_err(|e| CrdtError::Loro(e.to_string()))?;
96 }
97 Ok(count)
98 }
99
100 pub fn read_row(&self, collection: &str, row_id: &str) -> Option<LoroValue> {
105 let coll = self.doc.get_map(collection);
106 match coll.get(row_id)? {
107 ValueOrContainer::Container(loro::Container::Map(m)) => Some(m.get_value()),
108 ValueOrContainer::Container(loro::Container::List(l)) => Some(l.get_value()),
109 ValueOrContainer::Container(_) => Some(LoroValue::Null),
110 ValueOrContainer::Value(v) => Some(v),
111 }
112 }
113
114 pub fn read_field(&self, collection: &str, row_id: &str, field: &str) -> Option<LoroValue> {
123 let coll = self.doc.get_map(collection);
124 let row_map = match coll.get(row_id)? {
125 ValueOrContainer::Container(loro::Container::Map(m)) => m,
126 ValueOrContainer::Value(v) => return Some(v),
127 _ => return None,
128 };
129 match row_map.get(field)? {
130 ValueOrContainer::Value(v) => Some(v),
131 ValueOrContainer::Container(loro::Container::Map(m)) => Some(m.get_value()),
132 ValueOrContainer::Container(loro::Container::List(l)) => Some(l.get_value()),
133 ValueOrContainer::Container(_) => Some(LoroValue::Null),
134 }
135 }
136
137 pub fn row_exists(&self, collection: &str, row_id: &str) -> bool {
144 let coll = self.doc.get_map(collection);
145 if coll.get(row_id).is_some() {
146 return true;
147 }
148 self.array_surrogate_ids.contains(row_id)
149 }
150
151 pub fn collection_names(&self) -> Vec<String> {
153 let root = self.doc.get_deep_value();
154 match root {
155 LoroValue::Map(map) => map.keys().map(|k| k.to_string()).collect(),
156 _ => Vec::new(),
157 }
158 }
159
160 pub fn row_ids(&self, collection: &str) -> Vec<String> {
162 let coll = self.doc.get_map(collection);
163 coll.keys().map(|k| k.to_string()).collect()
164 }
165
166 pub fn field_value_exists(&self, collection: &str, field: &str, value: &LoroValue) -> bool {
169 let coll = self.doc.get_map(collection);
170 for key in coll.keys() {
171 let path = format!("{collection}/{key}/{field}");
172 if let Some(voc) = self.doc.get_by_str_path(&path) {
173 let field_val = match voc {
174 ValueOrContainer::Value(v) => v,
175 ValueOrContainer::Container(_) => {
176 continue;
177 }
178 };
179 if &field_val == value {
180 return true;
181 }
182 }
183 }
184 false
185 }
186
187 pub fn field_value_exists_live(
194 &self,
195 collection: &str,
196 field: &str,
197 value: &LoroValue,
198 ) -> bool {
199 let coll = self.doc.get_map(collection);
200 for key in coll.keys() {
201 let row_map = match coll.get(&key) {
202 Some(ValueOrContainer::Container(loro::Container::Map(m))) => m,
203 _ => continue,
204 };
205 if !row_is_live(&row_map) {
206 continue;
207 }
208 let field_val = match row_map.get(field) {
209 Some(ValueOrContainer::Value(v)) => v,
210 _ => continue,
211 };
212 if &field_val == value {
213 return true;
214 }
215 }
216 false
217 }
218
219 pub fn live_row_ids(&self, collection: &str) -> Vec<String> {
223 let coll = self.doc.get_map(collection);
224 let mut out = Vec::new();
225 for key in coll.keys() {
226 let row_map = match coll.get(&key) {
227 Some(ValueOrContainer::Container(loro::Container::Map(m))) => m,
228 _ => continue,
229 };
230 if row_is_live(&row_map) {
231 out.push(key.to_string());
232 }
233 }
234 out
235 }
236
237 pub fn doc(&self) -> &LoroDoc {
239 &self.doc
240 }
241
242 pub fn peer_id(&self) -> u64 {
244 self.peer_id
245 }
246}