rillrate_protocol/base/list_flow/
state.rs1use rill_protocol::flow::core::{DataFraction, Flow};
2use rill_protocol::io::provider::StreamType;
3use serde::{Deserialize, Serialize};
4use std::collections::btree_map::Entry;
5use std::collections::BTreeMap;
6use std::hash::Hash;
7
8pub trait ListFlowSpec: DataFraction {
10 type Id: DataFraction + Ord + Hash;
11 type Record: DataFraction;
12 type Action: DataFraction;
13 type Update: DataFraction; fn update_record(record: &mut Self::Record, update: Self::Update);
16
17 fn no_record_fallback(id: &Self::Id) -> Option<Self::Record> {
18 log::error!("List record with {:?} not found.", id);
19 None
20 }
21
22 fn is_spent(_record: &Self::Record) -> bool {
23 false
24 }
25}
26
27#[derive(Debug, Clone, Serialize, Deserialize)]
28pub struct ListFlowState<T: ListFlowSpec> {
29 #[serde(with = "vectorize")]
31 pub records: BTreeMap<T::Id, T::Record>,
32}
33
34#[allow(clippy::new_without_default)]
35impl<T: ListFlowSpec> ListFlowState<T> {
36 pub fn new() -> Self {
37 Self {
38 records: BTreeMap::new(),
39 }
40 }
41}
42
43impl<T: ListFlowSpec> Flow for ListFlowState<T> {
44 type Action = ListActionEnvelope<T>;
45 type Event = ListEventEnvelope<T>;
46
47 fn stream_type() -> StreamType {
48 StreamType::from(module_path!())
49 }
50
51 fn apply(&mut self, event: Self::Event) {
52 match event {
53 ListEventEnvelope::SingleRecord { id, update } => match update {
54 ListFlowEvent::AddRecord { record } => {
55 self.records.insert(id, record);
56 }
57 ListFlowEvent::UpdateRecord { update } => {
58 let entry = self.records.entry(id);
59 match entry {
60 Entry::Occupied(mut entry) => {
61 let record = entry.get_mut();
62 T::update_record(record, update);
63 if T::is_spent(record) {
64 entry.remove_entry();
65 }
66 }
67 Entry::Vacant(entry) => {
68 let fallback = T::no_record_fallback(entry.key());
69 if let Some(mut record) = fallback {
70 T::update_record(&mut record, update);
71 if !T::is_spent(&record) {
72 entry.insert(record);
73 }
74 }
75 }
76 }
77 }
78 ListFlowEvent::RemoveRecord => {
79 self.records.remove(&id);
80 }
81 },
82 ListEventEnvelope::FullSnapshot { records } => {
83 self.records = records;
84 }
85 ListEventEnvelope::Clear => {
86 self.records.clear();
87 }
88 }
89 }
90}
91
92#[derive(Debug, Clone, Serialize, Deserialize)]
93pub struct ListActionEnvelope<T: ListFlowSpec> {
94 pub id: T::Id,
95 pub action: T::Action,
96}
97
98impl<T: ListFlowSpec> From<(T::Id, T::Action)> for ListActionEnvelope<T> {
99 fn from((id, action): (T::Id, T::Action)) -> Self {
100 Self { id, action }
101 }
102}
103
104#[derive(Debug, Clone, Serialize, Deserialize)]
105pub enum ListEventEnvelope<T: ListFlowSpec> {
106 SingleRecord {
107 id: T::Id,
108 #[serde(bound = "")]
109 update: ListFlowEvent<T>,
110 },
111 FullSnapshot {
112 #[serde(with = "vectorize")]
113 records: BTreeMap<T::Id, T::Record>,
114 },
115 Clear,
116}
117
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub enum ListFlowEvent<T: ListFlowSpec> {
120 AddRecord { record: T::Record },
121 UpdateRecord { update: T::Update },
122 RemoveRecord,
123}