aranya_runtime/
client.rs

1use core::iter::DoubleEndedIterator;
2
3use buggy::Bug;
4use tracing::error;
5
6use crate::{
7    Address, CmdId, Command, Engine, EngineError, GraphId, PeerCache, Perspective as _, Policy,
8    Sink, Storage as _, StorageError, StorageProvider, engine::ActionPlacement,
9};
10
11mod braiding;
12mod session;
13mod transaction;
14
15pub use self::{session::Session, transaction::Transaction};
16
17/// An error returned by the runtime client.
18#[derive(Debug, thiserror::Error)]
19pub enum ClientError {
20    #[error("no such parent: {0}")]
21    NoSuchParent(CmdId),
22    #[error("engine error: {0}")]
23    EngineError(EngineError),
24    #[error("storage error: {0}")]
25    StorageError(#[from] StorageError),
26    #[error("init error")]
27    InitError,
28    #[error("not authorized")]
29    NotAuthorized,
30    #[error("session deserialize error: {0}")]
31    SessionDeserialize(#[from] postcard::Error),
32    /// Attempted to braid two parallel finalize commands together.
33    ///
34    /// Policy must be designed such that two parallel finalize commands are never produced.
35    ///
36    /// Currently, this is practically an unrecoverable error. You must wipe all graphs containing
37    /// the "bad" finalize command and resync from the "good" clients. Otherwise, your network will
38    /// split into two separate graph states which can never successfully sync.
39    #[error("found parallel finalize commands during braid")]
40    ParallelFinalize,
41    #[error(transparent)]
42    Bug(#[from] Bug),
43}
44
45impl From<EngineError> for ClientError {
46    fn from(error: EngineError) -> Self {
47        match error {
48            EngineError::Check => Self::NotAuthorized,
49            _ => Self::EngineError(error),
50        }
51    }
52}
53
54/// Keeps track of client graph state.
55///
56/// - `E` should be an implementation of [`Engine`].
57/// - `SP` should be an implementation of [`StorageProvider`].
58#[derive(Debug)]
59pub struct ClientState<E, SP> {
60    engine: E,
61    provider: SP,
62}
63
64impl<E, SP> ClientState<E, SP> {
65    /// Creates a `ClientState`.
66    pub const fn new(engine: E, provider: SP) -> Self {
67        Self { engine, provider }
68    }
69
70    /// Provide access to the [`StorageProvider`].
71    pub fn provider(&mut self) -> &mut SP {
72        &mut self.provider
73    }
74}
75
76impl<E, SP> ClientState<E, SP>
77where
78    E: Engine,
79    SP: StorageProvider,
80{
81    /// Create a new graph (AKA Team). This graph will start with the initial policy
82    /// provided which must be compatible with the engine E. The `payload` is the initial
83    /// init message that will bootstrap the graph facts. Effects produced when processing
84    /// the payload are emitted to the sink.
85    pub fn new_graph(
86        &mut self,
87        policy_data: &[u8],
88        action: <E::Policy as Policy>::Action<'_>,
89        sink: &mut impl Sink<E::Effect>,
90    ) -> Result<GraphId, ClientError> {
91        let policy_id = self.engine.add_policy(policy_data)?;
92        let policy = self.engine.get_policy(policy_id)?;
93
94        let mut perspective = self.provider.new_perspective(policy_id);
95        sink.begin();
96        policy
97            .call_action(action, &mut perspective, sink, ActionPlacement::OnGraph)
98            .inspect_err(|_| sink.rollback())?;
99        sink.commit();
100
101        let (graph_id, _) = self.provider.new_storage(perspective)?;
102
103        Ok(graph_id)
104    }
105
106    /// Remove a graph (AKA Team). The graph commands will be removed from storage.
107    pub fn remove_graph(&mut self, graph_id: GraphId) -> Result<(), ClientError> {
108        self.provider.remove_storage(graph_id)?;
109
110        Ok(())
111    }
112
113    /// Commit the [`Transaction`] to storage, after merging all temporary heads.
114    pub fn commit(
115        &mut self,
116        trx: &mut Transaction<SP, E>,
117        sink: &mut impl Sink<E::Effect>,
118    ) -> Result<(), ClientError> {
119        trx.commit(&mut self.provider, &mut self.engine, sink)?;
120        Ok(())
121    }
122
123    /// Add commands to the transaction, writing the results to
124    /// `sink`.
125    /// Returns the number of commands that were added.
126    pub fn add_commands(
127        &mut self,
128        trx: &mut Transaction<SP, E>,
129        sink: &mut impl Sink<E::Effect>,
130        commands: &[impl Command],
131    ) -> Result<usize, ClientError> {
132        trx.add_commands(commands, &mut self.provider, &mut self.engine, sink)
133    }
134
135    pub fn update_heads<I>(
136        &mut self,
137        storage_id: GraphId,
138        addrs: I,
139        request_heads: &mut PeerCache,
140    ) -> Result<(), ClientError>
141    where
142        I: IntoIterator<Item = Address>,
143        I::IntoIter: DoubleEndedIterator,
144    {
145        let storage = self.provider.get_storage(storage_id)?;
146
147        // Commands in sync messages are always ancestor-first (lower max_cut to higher max_cut).
148        // Reverse the iterator to process highest max_cut first, which allows us to skip ancestors
149        // since if a command is an ancestor of one we've already added, we don't need to add it.
150        for address in addrs.into_iter().rev() {
151            if let Some(loc) = storage.get_location(address)? {
152                request_heads.add_command(storage, address, loc)?;
153            } else {
154                error!(
155                    "UPDATE_HEADS: Address {:?} does NOT exist in storage, skipping (should not happen if command was successfully added)",
156                    address
157                );
158            }
159        }
160
161        Ok(())
162    }
163
164    /// Returns the address of the head of the graph.
165    pub fn head_address(&mut self, storage_id: GraphId) -> Result<Address, ClientError> {
166        let storage = self.provider.get_storage(storage_id)?;
167        let address = storage.get_head_address()?;
168        Ok(address)
169    }
170
171    /// Performs an `action`, writing the results to `sink`.
172    pub fn action(
173        &mut self,
174        storage_id: GraphId,
175        sink: &mut impl Sink<E::Effect>,
176        action: <E::Policy as Policy>::Action<'_>,
177    ) -> Result<(), ClientError> {
178        let storage = self.provider.get_storage(storage_id)?;
179
180        let head = storage.get_head()?;
181
182        let mut perspective = storage.get_linear_perspective(head)?;
183
184        let policy_id = perspective.policy();
185        let policy = self.engine.get_policy(policy_id)?;
186
187        // No need to checkpoint the perspective since it is only for this action.
188        // Must checkpoint once we add action transactions.
189
190        sink.begin();
191        match policy.call_action(action, &mut perspective, sink, ActionPlacement::OnGraph) {
192            Ok(()) => {
193                let segment = storage.write(perspective)?;
194                storage.commit(segment)?;
195                sink.commit();
196                Ok(())
197            }
198            Err(e) => {
199                sink.rollback();
200                Err(e.into())
201            }
202        }
203    }
204}
205
206impl<E, SP> ClientState<E, SP>
207where
208    SP: StorageProvider,
209{
210    /// Create a new [`Transaction`], used to receive [`Command`]s when syncing.
211    pub fn transaction(&mut self, storage_id: GraphId) -> Transaction<SP, E> {
212        Transaction::new(storage_id)
213    }
214
215    /// Create an ephemeral [`Session`] associated with this client.
216    pub fn session(&mut self, storage_id: GraphId) -> Result<Session<SP, E>, ClientError> {
217        Session::new(&mut self.provider, storage_id)
218    }
219
220    /// Checks if a command with the given address exists in the specified graph.
221    ///
222    /// Returns `true` if the command exists, `false` if it doesn't exist or the graph doesn't exist.
223    /// This method is used to determine if we need to sync when a hello message is received.
224    pub fn command_exists(&mut self, storage_id: GraphId, address: Address) -> bool {
225        let Ok(storage) = self.provider.get_storage(storage_id) else {
226            // Graph doesn't exist
227            return false;
228        };
229        storage.get_location(address).unwrap_or(None).is_some()
230    }
231}