aranya_runtime/
client.rs

1use buggy::{Bug, BugExt};
2
3use crate::{
4    Address, Command, CommandId, Engine, EngineError, GraphId, PeerCache, Perspective, Policy,
5    Sink, Storage, StorageError, StorageProvider,
6};
7
8mod braiding;
9mod session;
10mod transaction;
11
12pub use self::{session::Session, transaction::Transaction};
13
14/// An error returned by the runtime client.
15#[derive(Debug, thiserror::Error)]
16pub enum ClientError {
17    #[error("no such parent: {0}")]
18    NoSuchParent(CommandId),
19    #[error("engine error: {0}")]
20    EngineError(EngineError),
21    #[error("storage error: {0}")]
22    StorageError(#[from] StorageError),
23    #[error("init error")]
24    InitError,
25    #[error("not authorized")]
26    NotAuthorized,
27    #[error("session deserialize error: {0}")]
28    SessionDeserialize(#[from] postcard::Error),
29    /// Attempted to braid two parallel finalize commands together.
30    ///
31    /// Policy must be designed such that two parallel finalize commands are never produced.
32    ///
33    /// Currently, this is practically an unrecoverable error. You must wipe all graphs containing
34    /// the "bad" finalize command and resync from the "good" clients. Otherwise, your network will
35    /// split into two separate graph states which can never successfully sync.
36    #[error("found parallel finalize commands during braid")]
37    ParallelFinalize,
38    #[error(transparent)]
39    Bug(#[from] Bug),
40}
41
42impl From<EngineError> for ClientError {
43    fn from(error: EngineError) -> Self {
44        match error {
45            EngineError::Check => Self::NotAuthorized,
46            _ => Self::EngineError(error),
47        }
48    }
49}
50
51/// Keeps track of client graph state.
52///
53/// - `E` should be an implementation of [`Engine`].
54/// - `SP` should be an implementation of [`StorageProvider`].
55#[derive(Debug)]
56pub struct ClientState<E, SP> {
57    engine: E,
58    provider: SP,
59}
60
61impl<E, SP> ClientState<E, SP> {
62    /// Creates a `ClientState`.
63    pub const fn new(engine: E, provider: SP) -> ClientState<E, SP> {
64        ClientState { engine, provider }
65    }
66
67    /// Provide access to the [`StorageProvider`].
68    pub fn provider(&mut self) -> &mut SP {
69        &mut self.provider
70    }
71}
72
73impl<E, SP> ClientState<E, SP>
74where
75    E: Engine,
76    SP: StorageProvider,
77{
78    /// Create a new graph (AKA Team). This graph will start with the initial policy
79    /// provided which must be compatible with the engine E. The `payload` is the initial
80    /// init message that will bootstrap the graph facts. Effects produced when processing
81    /// the payload are emitted to the sink.
82    pub fn new_graph(
83        &mut self,
84        policy_data: &[u8],
85        action: <E::Policy as Policy>::Action<'_>,
86        sink: &mut impl Sink<E::Effect>,
87    ) -> Result<GraphId, ClientError> {
88        let policy_id = self.engine.add_policy(policy_data)?;
89        let policy = self.engine.get_policy(policy_id)?;
90
91        let mut perspective = self.provider.new_perspective(policy_id);
92        sink.begin();
93        policy
94            .call_action(action, &mut perspective, sink)
95            .inspect_err(|_| sink.rollback())?;
96        sink.commit();
97
98        let (graph_id, _) = self.provider.new_storage(perspective)?;
99
100        Ok(graph_id)
101    }
102
103    /// Remove a graph (AKA Team). The graph commands will be removed from storage.
104    pub fn remove_graph(&mut self, graph_id: GraphId) -> Result<(), ClientError> {
105        self.provider.remove_storage(graph_id)?;
106
107        Ok(())
108    }
109
110    /// Commit the [`Transaction`] to storage, after merging all temporary heads.
111    pub fn commit(
112        &mut self,
113        trx: &mut Transaction<SP, E>,
114        sink: &mut impl Sink<E::Effect>,
115    ) -> Result<(), ClientError> {
116        trx.commit(&mut self.provider, &mut self.engine, sink)?;
117        Ok(())
118    }
119
120    /// Add commands to the transaction, writing the results to
121    /// `sink`.
122    /// Returns the number of commands that were added.
123    pub fn add_commands(
124        &mut self,
125        trx: &mut Transaction<SP, E>,
126        sink: &mut impl Sink<E::Effect>,
127        commands: &[impl Command],
128    ) -> Result<usize, ClientError> {
129        let count = trx.add_commands(commands, &mut self.provider, &mut self.engine, sink)?;
130        Ok(count)
131    }
132
133    pub fn update_heads(
134        &mut self,
135        storage_id: GraphId,
136        addrs: impl IntoIterator<Item = Address>,
137        request_heads: &mut PeerCache,
138    ) -> Result<(), ClientError> {
139        let storage = self.provider.get_storage(storage_id)?;
140        for address in addrs {
141            if let Some(loc) = storage.get_location(address)? {
142                request_heads.add_command(storage, address, loc)?;
143            }
144        }
145        Ok(())
146    }
147
148    /// Returns the ID of the head of the graph.
149    pub fn head_id(&mut self, storage_id: GraphId) -> Result<CommandId, ClientError> {
150        let storage = self.provider.get_storage(storage_id)?;
151
152        let head = storage.get_head()?;
153        let id = storage.get_command_id(head)?;
154        Ok(id)
155    }
156
157    /// Performs an `action`, writing the results to `sink`.
158    pub fn action(
159        &mut self,
160        storage_id: GraphId,
161        sink: &mut impl Sink<E::Effect>,
162        action: <E::Policy as Policy>::Action<'_>,
163    ) -> Result<(), ClientError> {
164        let storage = self.provider.get_storage(storage_id)?;
165
166        let head = storage.get_head()?;
167
168        let mut perspective = storage
169            .get_linear_perspective(head)?
170            .assume("can always get perspective at head")?;
171
172        let policy_id = perspective.policy();
173        let policy = self.engine.get_policy(policy_id)?;
174
175        // No need to checkpoint the perspective since it is only for this action.
176        // Must checkpoint once we add action transactions.
177
178        sink.begin();
179        match policy.call_action(action, &mut perspective, sink) {
180            Ok(_) => {
181                let segment = storage.write(perspective)?;
182                storage.commit(segment)?;
183                sink.commit();
184                Ok(())
185            }
186            Err(e) => {
187                sink.rollback();
188                Err(e.into())
189            }
190        }
191    }
192}
193
194impl<E, SP> ClientState<E, SP>
195where
196    SP: StorageProvider,
197{
198    /// Create a new [`Transaction`], used to receive [`Command`]s when syncing.
199    pub fn transaction(&mut self, storage_id: GraphId) -> Transaction<SP, E> {
200        Transaction::new(storage_id)
201    }
202
203    /// Create an ephemeral [`Session`] associated with this client.
204    pub fn session(&mut self, storage_id: GraphId) -> Result<Session<SP, E>, ClientError> {
205        Session::new(&mut self.provider, storage_id)
206    }
207}