ankurah_core/
changes.rs

1use crate::{entity::Entity, error::MutationError, model::View, reactor::ChangeNotification};
2use ankurah_proto::{Attested, Event};
3
4#[derive(Debug, Clone)]
5pub struct EntityChange {
6    entity: Entity,
7    events: Vec<Attested<Event>>,
8}
9
10// Implement the trait for EntityChange
11impl ChangeNotification for EntityChange {
12    type Entity = Entity;
13    type Event = ankurah_proto::Attested<Event>;
14
15    fn into_parts(self) -> (Self::Entity, Vec<Self::Event>) { (self.entity, self.events) }
16    fn entity(&self) -> &Self::Entity { &self.entity }
17    fn events(&self) -> &[Self::Event] { &self.events }
18}
19
20// TODO consider a flattened version of EntityChange that includes the entity and Vec<(operations, parent, attestations)> rather than a Vec<Attested<Event>>
21impl EntityChange {
22    pub fn new(entity: Entity, events: Vec<Attested<Event>>) -> Result<Self, MutationError> {
23        // validate that all events have the same entity id as the entity
24        // and that the event ids are present in the entity's head clock
25        for event in &events {
26            let head = entity.head();
27            if event.payload.entity_id != entity.id {
28                return Err(MutationError::InvalidEvent);
29            }
30            if !head.contains(&event.payload.id()) {
31                return Err(MutationError::InvalidEvent);
32            }
33        }
34        Ok(Self { entity, events })
35    }
36    pub fn into_parts(self) -> (Entity, Vec<Attested<Event>>) { (self.entity, self.events) }
37}
38
39#[derive(Debug, Clone)]
40pub enum ItemChange<I> {
41    /// Initial retrieval of an item upon subscription
42    Initial { item: I },
43    /// A new item was added OR changed such that it now matches the subscription
44    Add { item: I, events: Vec<Attested<Event>> },
45    /// A item that previously matched the subscription has changed in a way that has not changed the matching condition
46    Update { item: I, events: Vec<Attested<Event>> },
47    /// A item that previously matched the subscription has changed in a way that no longer matches the subscription
48    Remove { item: I, events: Vec<Attested<Event>> },
49}
50
51impl<I> ItemChange<I> {
52    pub fn entity(&self) -> &I {
53        match self {
54            ItemChange::Initial { item }
55            | ItemChange::Add { item, .. }
56            | ItemChange::Update { item, .. }
57            | ItemChange::Remove { item, .. } => item,
58        }
59    }
60
61    pub fn events(&self) -> &[Attested<Event>] {
62        match self {
63            ItemChange::Add { events, .. } | ItemChange::Update { events, .. } | ItemChange::Remove { events, .. } => events,
64            _ => &[],
65        }
66    }
67    pub fn kind(&self) -> ChangeKind { ChangeKind::from(self) }
68}
69
70impl<I> std::fmt::Display for ItemChange<I>
71where I: View
72{
73    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
74        match self {
75            ItemChange::Initial { item } => {
76                write!(f, "Initial {}/{}", I::collection(), item.id())
77            }
78            ItemChange::Add { item, .. } => {
79                write!(f, "Add {}/{}", I::collection(), item.id())
80            }
81            ItemChange::Update { item, .. } => {
82                write!(f, "Update {}/{}", I::collection(), item.id())
83            }
84            ItemChange::Remove { item, .. } => {
85                write!(f, "Remove {}/{}", I::collection(), item.id())
86            }
87        }
88    }
89}
90
91impl std::fmt::Display for EntityChange {
92    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
93        write!(f, "EntityChange {}/{}", self.entity.collection(), self.entity.id())
94    }
95}
96
97use crate::resultset::ResultSet;
98
99#[derive(Debug, Clone)]
100pub struct ChangeSet<R: View> {
101    pub resultset: ResultSet<R>,
102    pub changes: Vec<ItemChange<R>>,
103}
104
105impl<R: View> ChangeSet<R>
106where R: Clone
107{
108    /// Returns all items that were added or now match the query
109    pub fn adds(&self) -> Vec<R> {
110        self.changes
111            .iter()
112            .filter_map(|change| match change {
113                ItemChange::Add { item, .. } | ItemChange::Initial { item } => Some(item.clone()),
114                _ => None,
115            })
116            .collect()
117    }
118
119    /// Returns all items that were removed or no longer match the query
120    pub fn removes(&self) -> Vec<R> {
121        self.changes
122            .iter()
123            .filter_map(|change| match change {
124                ItemChange::Remove { item, .. } => Some(item.clone()),
125                _ => None,
126            })
127            .collect()
128    }
129
130    /// Returns all items that were updated but still match the query
131    pub fn updates(&self) -> Vec<R> {
132        self.changes
133            .iter()
134            .filter_map(|change| match change {
135                ItemChange::Update { item, .. } => Some(item.clone()),
136                _ => None,
137            })
138            .collect()
139    }
140}
141
142impl<I> std::fmt::Display for ChangeSet<I>
143where I: View + Clone + 'static
144{
145    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
146        // print the number of results in the resultset, and then display each change
147        use ankurah_signals::Peek;
148        let results = self.resultset.peek().len();
149        write!(f, "ChangeSet({results} results): {}", self.changes.iter().map(|c| c.to_string()).collect::<Vec<_>>().join(", "))
150    }
151}
152
153// Note: ChangeSet<Entity> conversion removed since Entity doesn't implement View
154// and ChangeSet is no longer used by Reactor
155
156impl<I> From<ItemChange<Entity>> for ItemChange<I>
157where I: View
158{
159    fn from(change: ItemChange<Entity>) -> Self {
160        match change {
161            ItemChange::Initial { item } => ItemChange::Initial { item: I::from_entity(item) },
162            ItemChange::Add { item, events } => ItemChange::Add { item: I::from_entity(item), events },
163            ItemChange::Update { item, events } => ItemChange::Update { item: I::from_entity(item), events },
164            ItemChange::Remove { item, events } => ItemChange::Remove { item: I::from_entity(item), events },
165        }
166    }
167}
168
169#[derive(Debug, Clone, PartialEq)]
170pub enum ChangeKind {
171    Initial,
172    Add,
173    Remove,
174    Update,
175}
176
177impl<R> From<&ItemChange<R>> for ChangeKind {
178    fn from(change: &ItemChange<R>) -> Self {
179        match change {
180            ItemChange::Initial { .. } => ChangeKind::Initial,
181            ItemChange::Add { .. } => ChangeKind::Add,
182            ItemChange::Remove { .. } => ChangeKind::Remove,
183            ItemChange::Update { .. } => ChangeKind::Update,
184        }
185    }
186}
187
188// Moved all the ReactorUpdate stuff into reactor.rs because it's specific to the reactor
189// and we have several types of updates, so it's essential to keep them organized