rillrate_protocol/base/list_flow/
state.rs

1use rill_protocol::flow::core::{DataFraction, Flow};
2use rill_protocol::io::provider::StreamType;
3use serde::{Deserialize, Serialize};
4use std::collections::btree_map::Entry;
5use std::collections::BTreeMap;
6use std::hash::Hash;
7
8/// Similar to meta, but every record can be changed separately.
9pub trait ListFlowSpec: DataFraction {
10    type Id: DataFraction + Ord + Hash;
11    type Record: DataFraction;
12    type Action: DataFraction;
13    type Update: DataFraction; // aka `Event`, but inner
14
15    fn update_record(record: &mut Self::Record, update: Self::Update);
16
17    fn no_record_fallback(id: &Self::Id) -> Option<Self::Record> {
18        log::error!("List record with {:?} not found.", id);
19        None
20    }
21
22    fn is_spent(_record: &Self::Record) -> bool {
23        false
24    }
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct ListFlowState<T: ListFlowSpec> {
29    // TODO: Use `ListFlowSnapshot` here instead.
30    #[serde(with = "vectorize")]
31    pub records: BTreeMap<T::Id, T::Record>,
32}
33
34#[allow(clippy::new_without_default)]
35impl<T: ListFlowSpec> ListFlowState<T> {
36    pub fn new() -> Self {
37        Self {
38            records: BTreeMap::new(),
39        }
40    }
41}
42
43impl<T: ListFlowSpec> Flow for ListFlowState<T> {
44    type Action = ListActionEnvelope<T>;
45    type Event = ListEventEnvelope<T>;
46
47    fn stream_type() -> StreamType {
48        StreamType::from(module_path!())
49    }
50
51    fn apply(&mut self, event: Self::Event) {
52        match event {
53            ListEventEnvelope::SingleRecord { id, update } => match update {
54                ListFlowEvent::AddRecord { record } => {
55                    self.records.insert(id, record);
56                }
57                ListFlowEvent::UpdateRecord { update } => {
58                    let entry = self.records.entry(id);
59                    match entry {
60                        Entry::Occupied(mut entry) => {
61                            let record = entry.get_mut();
62                            T::update_record(record, update);
63                            if T::is_spent(record) {
64                                entry.remove_entry();
65                            }
66                        }
67                        Entry::Vacant(entry) => {
68                            let fallback = T::no_record_fallback(entry.key());
69                            if let Some(mut record) = fallback {
70                                T::update_record(&mut record, update);
71                                if !T::is_spent(&record) {
72                                    entry.insert(record);
73                                }
74                            }
75                        }
76                    }
77                }
78                ListFlowEvent::RemoveRecord => {
79                    self.records.remove(&id);
80                }
81            },
82            ListEventEnvelope::FullSnapshot { records } => {
83                self.records = records;
84            }
85            ListEventEnvelope::Clear => {
86                self.records.clear();
87            }
88        }
89    }
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct ListActionEnvelope<T: ListFlowSpec> {
94    pub id: T::Id,
95    pub action: T::Action,
96}
97
98impl<T: ListFlowSpec> From<(T::Id, T::Action)> for ListActionEnvelope<T> {
99    fn from((id, action): (T::Id, T::Action)) -> Self {
100        Self { id, action }
101    }
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub enum ListEventEnvelope<T: ListFlowSpec> {
106    SingleRecord {
107        id: T::Id,
108        #[serde(bound = "")]
109        update: ListFlowEvent<T>,
110    },
111    FullSnapshot {
112        #[serde(with = "vectorize")]
113        records: BTreeMap<T::Id, T::Record>,
114    },
115    Clear,
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub enum ListFlowEvent<T: ListFlowSpec> {
120    AddRecord { record: T::Record },
121    UpdateRecord { update: T::Update },
122    RemoveRecord,
123}