1use ankurah_proto::{Clock, CollectionId, Event, State, ID};
2use tracing::info;
3use std::sync::Arc;
6use std::sync::Mutex;
7
8use crate::property::PropertyError;
9use crate::property::PropertyValue;
10use crate::{error::RetrievalError, property::Backends};
11
12use anyhow::Result;
13
14use ankql::selection::filter::Filterable;
15
16pub trait Model {
19 type View: View;
20 type Mutable<'trx>: Mutable<'trx>;
21 fn collection() -> CollectionId;
22 fn create_entity(&self, id: ID) -> Entity;
23}
24
25pub trait View {
27 type Model: Model;
28 type Mutable<'trx>: Mutable<'trx>;
29 fn id(&self) -> ID { self.entity().id }
30 fn backends(&self) -> &Backends { self.entity().backends() }
31 fn collection() -> CollectionId { <Self::Model as Model>::collection() }
32 fn entity(&self) -> &Arc<Entity>;
33 fn from_entity(inner: Arc<Entity>) -> Self;
34 fn to_model(&self) -> Result<Self::Model, PropertyError>;
35}
36
37impl std::fmt::Display for Entity {
38 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39 write!(f, "Entity({}/{}) = {}", self.collection, self.id, self.head.lock().unwrap())
40 }
41}
42
43#[derive(Debug)]
45pub struct Entity {
46 pub id: ID,
47 pub collection: CollectionId,
48 backends: Backends,
49 head: Arc<Mutex<Clock>>,
50 pub upstream: Option<Arc<Entity>>,
51}
52
53impl Entity {
54 pub fn collection(&self) -> CollectionId { self.collection.clone() }
55
56 pub fn backends(&self) -> &Backends { &self.backends }
57
58 pub fn to_state(&self) -> Result<State> { self.backends.to_state_buffers() }
59
60 pub fn create(id: ID, collection: CollectionId, backends: Backends) -> Self {
62 Self { id, collection, backends, head: Arc::new(Mutex::new(Clock::default())), upstream: None }
63 }
64 pub fn from_state(id: ID, collection: CollectionId, state: &State) -> Result<Self, RetrievalError> {
65 let backends = Backends::from_state_buffers(state)?;
66
67 Ok(Self { id, collection, backends, head: Arc::new(Mutex::new(state.head.clone())), upstream: None })
68 }
69
70 pub fn commit(&self) -> Result<Option<Event>> {
74 let operations = self.backends.to_operations()?;
75 if operations.is_empty() {
76 Ok(None)
77 } else {
78 let event = {
79 let event = Event {
80 id: ID::new(),
81 entity_id: self.id.clone(),
82 collection: self.collection.clone(),
83 operations,
84 parent: self.head.lock().unwrap().clone(),
85 };
86
87 *self.head.lock().unwrap() = Clock::new([event.id]);
89 event
90 };
91
92 info!("Commit {}", self);
93 Ok(Some(event))
94 }
95 }
96
97 pub fn apply_event(&self, event: &Event) -> Result<()> {
123 let head = Clock::new([event.id]);
129 for (backend_name, operations) in &event.operations {
130 self.backends.apply_operations((*backend_name).to_owned(), operations, &head, &event.parent )?;
132 }
133 info!("Apply event {}", event);
135
136 *self.head.lock().unwrap() = head.clone();
137 *self.backends.head.lock().unwrap() = head;
139 info!("Apply event MARK 2 new head {}", self.head.lock().unwrap());
140
141 Ok(())
142 }
143
144 pub fn apply_state(&self, state: &State) -> Result<(), RetrievalError> {
146 self.backends.apply_state(state)?;
147 Ok(())
148 }
149
150 pub fn snapshot(self: &Arc<Self>) -> Arc<Self> {
152 Arc::new(Self {
153 id: self.id.clone(),
154 collection: self.collection.clone(),
155 backends: self.backends.fork(),
156 head: Arc::new(Mutex::new(self.head.lock().unwrap().clone())),
157 upstream: Some(self.clone()),
158 })
159 }
160}
161
162impl Filterable for Entity {
163 fn collection(&self) -> &str { self.collection.as_str() }
164
165 fn value(&self, name: &str) -> Option<String> {
169 if name == "id" {
170 Some(self.id.to_string())
171 } else {
172 let backends = self.backends.backends.lock().unwrap();
174 backends.values().find_map(|backend| match backend.property_value(&name.to_owned()) {
175 Some(value) => match value {
176 PropertyValue::String(s) => Some(s),
177 PropertyValue::I16(i) => Some(i.to_string()),
178 PropertyValue::I32(i) => Some(i.to_string()),
179 PropertyValue::I64(i) => Some(i.to_string()),
180 PropertyValue::Object(items) => Some(String::from_utf8_lossy(&items).to_string()),
181 PropertyValue::Binary(items) => Some(String::from_utf8_lossy(&items).to_string()),
182 },
183 None => None,
184 })
185 }
186 }
187}
188
189pub trait Mutable<'rec> {
192 type Model: Model;
193 type View: View;
194 fn id(&self) -> ID { self.entity().id }
195 fn collection() -> CollectionId { <Self::Model as Model>::collection() }
196 fn backends(&self) -> &Backends { &self.entity().backends }
197 fn entity(&self) -> &Arc<Entity>;
198 fn new(inner: &'rec Arc<Entity>) -> Self
199 where Self: Sized;
200
201 fn state(&self) -> anyhow::Result<State> { self.entity().to_state() }
202
203 fn read(&self) -> Self::View {
204 let inner: &Arc<Entity> = self.entity();
205
206 let new_inner = match &inner.upstream {
207 Some(upstream) => upstream.clone(),
209 None => inner.clone(),
211 };
212
213 Self::View::from_entity(new_inner)
214 }
215}