aper/
aper.rs

1use crate::{
2    connection::{ClientConnection, MessageToServer},
3    store::Store,
4    Mutation, StoreHandle,
5};
6use serde::{Deserialize, Serialize};
7use std::{
8    collections::{HashSet, VecDeque},
9    fmt::Debug,
10};
11
12pub trait AperSync {
13    fn attach(map: StoreHandle) -> Self;
14
15    fn listen<F: Fn() -> bool + 'static + Send + Sync>(&self, listener: F) {
16        // Default implementation does nothing.
17    }
18}
19
20pub trait Aper: AperSync {
21    type Intent: Clone + Serialize + for<'de> Deserialize<'de> + PartialEq;
22    type Error: Debug;
23
24    fn apply(&mut self, intent: &Self::Intent) -> Result<(), Self::Error>;
25}
26
27struct SpeculativeIntent<I> {
28    intent: I,
29    version: u64,
30}
31
32pub struct AperClient<A: Aper> {
33    store: Store,
34    intent_stack: VecDeque<SpeculativeIntent<A::Intent>>,
35
36    /// The next unused client version number for this client.
37    next_client_version: u64,
38
39    /// The highest *local* client version that has been confirmed by the server.
40    verified_client_version: u64,
41
42    /// The highest *server* version that has been confirmed by the server.
43    /// Note that server and client versions are not related.
44    verified_server_version: u64,
45}
46
47impl<A: Aper> Default for AperClient<A> {
48    fn default() -> Self {
49        Self::new()
50    }
51}
52
53impl<A: Aper> AperClient<A> {
54    pub fn new() -> Self {
55        let map = Store::default();
56        // add an overlay for speculative (local) changes
57        map.push_overlay();
58
59        Self {
60            store: map,
61            intent_stack: VecDeque::new(),
62            next_client_version: 1,
63            verified_client_version: 0,
64            verified_server_version: 0,
65        }
66    }
67
68    pub fn store(&self) -> Store {
69        self.store.clone()
70    }
71
72    pub fn connect<F: Fn(MessageToServer) + 'static, FS: Fn(A, u32) + 'static>(
73        self,
74        message_callback: F,
75        state_callback: FS,
76    ) -> ClientConnection<A> {
77        ClientConnection::new(self, message_callback, state_callback)
78    }
79
80    pub fn state(&self) -> A {
81        A::attach(self.store.handle())
82    }
83
84    pub fn verified_client_version(&self) -> u64 {
85        self.verified_client_version
86    }
87
88    pub fn speculative_client_version(&self) -> u64 {
89        self.intent_stack
90            .back()
91            .map_or(self.verified_client_version, |index| index.version)
92    }
93
94    pub fn verified_server_version(&self) -> u64 {
95        self.verified_server_version
96    }
97
98    /// Apply a mutation to the local client state.
99    pub fn apply(&mut self, intent: &A::Intent) -> Result<u64, A::Error> {
100        self.store.push_overlay();
101
102        {
103            let mut sm = A::attach(self.store.handle());
104
105            if let Err(e) = sm.apply(intent) {
106                // reverse changes.
107                self.store.pop_overlay();
108                return Err(e);
109            }
110        }
111
112        let version = self.next_client_version;
113        self.intent_stack.push_back(SpeculativeIntent {
114            intent: intent.clone(),
115            version,
116        });
117        self.next_client_version += 1;
118
119        self.store.combine_down();
120        self.store.notify_dirty();
121
122        Ok(version)
123    }
124
125    /// Mutate the local client state according to server-verified mutations.
126    pub fn mutate(
127        &mut self,
128        mutations: &[Mutation],
129        client_version: Option<u64>,
130        server_version: u64,
131    ) {
132        // pop speculative overlay
133        // TODO: we need to capture notifications from the speculative overlay being popped, since it could
134        // undo changes that are not re-done.
135        self.store.pop_overlay();
136        self.verified_server_version = server_version;
137
138        self.store.mutate(mutations);
139
140        // push new speculative overlay
141        self.store.push_overlay();
142
143        if let Some(version) = client_version {
144            self.verified_client_version = version;
145
146            if let Some(index) = self.intent_stack.front() {
147                if index.version == version {
148                    self.intent_stack.pop_front();
149                    // happy case; no need to recompute other speculative intents
150                    return;
151                }
152            }
153
154            while let Some(index) = self.intent_stack.front() {
155                if index.version > version {
156                    break;
157                }
158
159                self.intent_stack.pop_front();
160            }
161        }
162
163        for speculative_intent in self.intent_stack.iter() {
164            // push a working overlay
165            self.store.push_overlay();
166
167            let mut sm = A::attach(self.store.handle());
168
169            if sm.apply(&speculative_intent.intent).is_err() {
170                // reverse changes.
171                self.store.pop_overlay();
172                continue;
173            }
174
175            self.store.combine_down();
176        }
177
178        self.store.notify_dirty();
179    }
180}
181
182pub struct AperServer<A: Aper> {
183    map: Store,
184    version: u64,
185    _phantom: std::marker::PhantomData<A>,
186}
187
188impl<A: Aper> Default for AperServer<A> {
189    fn default() -> Self {
190        Self::new()
191    }
192}
193
194impl<A: Aper> AperServer<A> {
195    pub fn new() -> Self {
196        let map = Store::default();
197
198        Self {
199            map,
200            version: 0,
201            _phantom: std::marker::PhantomData,
202        }
203    }
204
205    pub fn version(&self) -> u64 {
206        self.version
207    }
208
209    pub fn state_snapshot(&self) -> Vec<Mutation> {
210        // this works because the server only has one layer
211        self.map.top_layer_mutations()
212    }
213
214    pub fn apply(&mut self, intent: &A::Intent) -> Result<Vec<Mutation>, A::Error> {
215        self.map.push_overlay();
216
217        let mut sm = A::attach(self.map.handle());
218
219        if let Err(e) = sm.apply(intent) {
220            // reverse changes.
221            self.map.pop_overlay();
222            return Err(e);
223        }
224
225        self.version += 1;
226
227        let mutations = self.map.top_layer_mutations();
228        self.map.combine_down();
229
230        Ok(mutations)
231    }
232
233    pub fn state(&self) -> A {
234        A::attach(self.map.handle())
235    }
236}