ankurah_core/
model.rs

1use ankurah_proto::{Clock, CollectionId, Event, State, ID};
2use tracing::info;
3// use futures_signals::signal::Signal;
4
5use std::sync::Arc;
6use std::sync::Mutex;
7
8use crate::property::PropertyError;
9use crate::property::PropertyValue;
10use crate::{error::RetrievalError, property::Backends};
11
12use anyhow::Result;
13
14use ankql::selection::filter::Filterable;
15
16/// A model is a struct that represents the present values for a given entity
17/// Schema is defined primarily by the Model object, and the View is derived from that via macro.
18pub trait Model {
19    type View: View;
20    type Mutable<'trx>: Mutable<'trx>;
21    fn collection() -> CollectionId;
22    fn create_entity(&self, id: ID) -> Entity;
23}
24
25/// A read only view of an Entity which offers typed accessors
26pub trait View {
27    type Model: Model;
28    type Mutable<'trx>: Mutable<'trx>;
29    fn id(&self) -> ID { self.entity().id }
30    fn backends(&self) -> &Backends { self.entity().backends() }
31    fn collection() -> CollectionId { <Self::Model as Model>::collection() }
32    fn entity(&self) -> &Arc<Entity>;
33    fn from_entity(inner: Arc<Entity>) -> Self;
34    fn to_model(&self) -> Result<Self::Model, PropertyError>;
35}
36
37impl std::fmt::Display for Entity {
38    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
39        write!(f, "Entity({}/{}) = {}", self.collection, self.id, self.head.lock().unwrap())
40    }
41}
42
43/// An entity represents a unique thing within a collection
44#[derive(Debug)]
45pub struct Entity {
46    pub id: ID,
47    pub collection: CollectionId,
48    backends: Backends,
49    head: Arc<Mutex<Clock>>,
50    pub upstream: Option<Arc<Entity>>,
51}
52
53impl Entity {
54    pub fn collection(&self) -> CollectionId { self.collection.clone() }
55
56    pub fn backends(&self) -> &Backends { &self.backends }
57
58    pub fn to_state(&self) -> Result<State> { self.backends.to_state_buffers() }
59
60    // used by the Model macro
61    pub fn create(id: ID, collection: CollectionId, backends: Backends) -> Self {
62        Self { id, collection, backends, head: Arc::new(Mutex::new(Clock::default())), upstream: None }
63    }
64    pub fn from_state(id: ID, collection: CollectionId, state: &State) -> Result<Self, RetrievalError> {
65        let backends = Backends::from_state_buffers(state)?;
66
67        Ok(Self { id, collection, backends, head: Arc::new(Mutex::new(state.head.clone())), upstream: None })
68    }
69
70    /// Collect an event which contains all operations for all backends since the last time they were collected
71    /// Used for transaction commit.
72    /// TODO: We need to think about rollbacks
73    pub fn commit(&self) -> Result<Option<Event>> {
74        let operations = self.backends.to_operations()?;
75        if operations.is_empty() {
76            Ok(None)
77        } else {
78            let event = {
79                let event = Event {
80                    id: ID::new(),
81                    entity_id: self.id.clone(),
82                    collection: self.collection.clone(),
83                    operations,
84                    parent: self.head.lock().unwrap().clone(),
85                };
86
87                // Set the head to the event's ID
88                *self.head.lock().unwrap() = Clock::new([event.id]);
89                event
90            };
91
92            info!("Commit {}", self);
93            Ok(Some(event))
94        }
95    }
96
97    /*
98        entity1: [], head: [],
99        event1: ["blah"], precursors: [],
100        entity1: ["blah"], head: [event1],
101
102        event2: [], precursor: [event1],
103        event3: [], precursor: [event1],
104        event4: [], precursor: [event2, event3],
105        [event4] == [event4, event3, event2, event1]
106
107        enum ClockOrder {
108            Descends,
109            Concurrent,
110            IsDescendedBy,
111            Divergent,
112            ComparisonBudgetExceeded,
113        }
114
115        impl Clock {
116            pub async fn compare(&self, other: &Self, node: &Node) -> ClockOrdering {
117
118            }
119        }
120    */
121
122    pub fn apply_event(&self, event: &Event) -> Result<()> {
123        /*
124           case A: event precursor descends the current head, then set entity clock to singleton of event id
125           case B: event precursor is concurrent to the current head, push event id to event head clock.
126           case C: event precursor is descended by the current head
127        */
128        let head = Clock::new([event.id]);
129        for (backend_name, operations) in &event.operations {
130            // TODO - backends and Entity should not have two copies of the head. Figure out how to unify them
131            self.backends.apply_operations((*backend_name).to_owned(), operations, &head, &event.parent /* , context*/)?;
132        }
133        // TODO figure out how to test this
134        info!("Apply event {}", event);
135
136        *self.head.lock().unwrap() = head.clone();
137        // Hack
138        *self.backends.head.lock().unwrap() = head;
139        info!("Apply event MARK 2 new head {}", self.head.lock().unwrap());
140
141        Ok(())
142    }
143
144    /// HACK - we probably shouldn't be stomping on the backends like this
145    pub fn apply_state(&self, state: &State) -> Result<(), RetrievalError> {
146        self.backends.apply_state(state)?;
147        Ok(())
148    }
149
150    /// Create a snapshot of the Entity which is detached from this one, and will not receive the updates this one does
151    pub fn snapshot(self: &Arc<Self>) -> Arc<Self> {
152        Arc::new(Self {
153            id: self.id.clone(),
154            collection: self.collection.clone(),
155            backends: self.backends.fork(),
156            head: Arc::new(Mutex::new(self.head.lock().unwrap().clone())),
157            upstream: Some(self.clone()),
158        })
159    }
160}
161
162impl Filterable for Entity {
163    fn collection(&self) -> &str { self.collection.as_str() }
164
165    /// TODO Implement this as a typecasted value. eg value<T> -> Option<Result<T>>
166    /// where None is returned if the property is not found, and Err is returned if the property is found but is not able to be typecasted
167    /// to the requested type. (need to think about the rust type system here more)
168    fn value(&self, name: &str) -> Option<String> {
169        if name == "id" {
170            Some(self.id.to_string())
171        } else {
172            // Iterate through backends to find one that has this property
173            let backends = self.backends.backends.lock().unwrap();
174            backends.values().find_map(|backend| match backend.property_value(&name.to_owned()) {
175                Some(value) => match value {
176                    PropertyValue::String(s) => Some(s),
177                    PropertyValue::I16(i) => Some(i.to_string()),
178                    PropertyValue::I32(i) => Some(i.to_string()),
179                    PropertyValue::I64(i) => Some(i.to_string()),
180                    PropertyValue::Object(items) => Some(String::from_utf8_lossy(&items).to_string()),
181                    PropertyValue::Binary(items) => Some(String::from_utf8_lossy(&items).to_string()),
182                },
183                None => None,
184            })
185        }
186    }
187}
188
189/// A mutable Model instance for an Entity with typed accessors.
190/// It is associated with a transaction, and may not outlive said transaction.
191pub trait Mutable<'rec> {
192    type Model: Model;
193    type View: View;
194    fn id(&self) -> ID { self.entity().id }
195    fn collection() -> CollectionId { <Self::Model as Model>::collection() }
196    fn backends(&self) -> &Backends { &self.entity().backends }
197    fn entity(&self) -> &Arc<Entity>;
198    fn new(inner: &'rec Arc<Entity>) -> Self
199    where Self: Sized;
200
201    fn state(&self) -> anyhow::Result<State> { self.entity().to_state() }
202
203    fn read(&self) -> Self::View {
204        let inner: &Arc<Entity> = self.entity();
205
206        let new_inner = match &inner.upstream {
207            // If there is an upstream, use it
208            Some(upstream) => upstream.clone(),
209            // Else we're a new Entity, and we have to rely on the commit to add this to the node
210            None => inner.clone(),
211        };
212
213        Self::View::from_entity(new_inner)
214    }
215}