aranya_runtime/
client.rs

1use buggy::Bug;
2
3use crate::{
4    Address, CmdId, Command, Engine, EngineError, GraphId, PeerCache, Perspective as _, Policy,
5    Sink, Storage as _, StorageError, StorageProvider, engine::ActionPlacement,
6};
7
8mod braiding;
9mod session;
10mod transaction;
11
12pub use self::{session::Session, transaction::Transaction};
13
14/// An error returned by the runtime client.
15#[derive(Debug, thiserror::Error)]
16pub enum ClientError {
17    #[error("no such parent: {0}")]
18    NoSuchParent(CmdId),
19    #[error("engine error: {0}")]
20    EngineError(EngineError),
21    #[error("storage error: {0}")]
22    StorageError(#[from] StorageError),
23    #[error("init error")]
24    InitError,
25    #[error("not authorized")]
26    NotAuthorized,
27    #[error("session deserialize error: {0}")]
28    SessionDeserialize(#[from] postcard::Error),
29    /// Attempted to braid two parallel finalize commands together.
30    ///
31    /// Policy must be designed such that two parallel finalize commands are never produced.
32    ///
33    /// Currently, this is practically an unrecoverable error. You must wipe all graphs containing
34    /// the "bad" finalize command and resync from the "good" clients. Otherwise, your network will
35    /// split into two separate graph states which can never successfully sync.
36    #[error("found parallel finalize commands during braid")]
37    ParallelFinalize,
38    #[error(transparent)]
39    Bug(#[from] Bug),
40}
41
42impl From<EngineError> for ClientError {
43    fn from(error: EngineError) -> Self {
44        match error {
45            EngineError::Check => Self::NotAuthorized,
46            _ => Self::EngineError(error),
47        }
48    }
49}
50
51/// Keeps track of client graph state.
52///
53/// - `E` should be an implementation of [`Engine`].
54/// - `SP` should be an implementation of [`StorageProvider`].
55#[derive(Debug)]
56pub struct ClientState<E, SP> {
57    engine: E,
58    provider: SP,
59}
60
61impl<E, SP> ClientState<E, SP> {
62    /// Creates a `ClientState`.
63    pub const fn new(engine: E, provider: SP) -> Self {
64        Self { engine, provider }
65    }
66
67    /// Provide access to the [`StorageProvider`].
68    pub fn provider(&mut self) -> &mut SP {
69        &mut self.provider
70    }
71}
72
73impl<E, SP> ClientState<E, SP>
74where
75    E: Engine,
76    SP: StorageProvider,
77{
78    /// Create a new graph (AKA Team). This graph will start with the initial policy
79    /// provided which must be compatible with the engine E. The `payload` is the initial
80    /// init message that will bootstrap the graph facts. Effects produced when processing
81    /// the payload are emitted to the sink.
82    pub fn new_graph(
83        &mut self,
84        policy_data: &[u8],
85        action: <E::Policy as Policy>::Action<'_>,
86        sink: &mut impl Sink<E::Effect>,
87    ) -> Result<GraphId, ClientError> {
88        let policy_id = self.engine.add_policy(policy_data)?;
89        let policy = self.engine.get_policy(policy_id)?;
90
91        let mut perspective = self.provider.new_perspective(policy_id);
92        sink.begin();
93        policy
94            .call_action(action, &mut perspective, sink, ActionPlacement::OnGraph)
95            .inspect_err(|_| sink.rollback())?;
96        sink.commit();
97
98        let (graph_id, _) = self.provider.new_storage(perspective)?;
99
100        Ok(graph_id)
101    }
102
103    /// Remove a graph (AKA Team). The graph commands will be removed from storage.
104    pub fn remove_graph(&mut self, graph_id: GraphId) -> Result<(), ClientError> {
105        self.provider.remove_storage(graph_id)?;
106
107        Ok(())
108    }
109
110    /// Commit the [`Transaction`] to storage, after merging all temporary heads.
111    pub fn commit(
112        &mut self,
113        trx: &mut Transaction<SP, E>,
114        sink: &mut impl Sink<E::Effect>,
115    ) -> Result<(), ClientError> {
116        trx.commit(&mut self.provider, &mut self.engine, sink)?;
117        Ok(())
118    }
119
120    /// Add commands to the transaction, writing the results to
121    /// `sink`.
122    /// Returns the number of commands that were added.
123    pub fn add_commands(
124        &mut self,
125        trx: &mut Transaction<SP, E>,
126        sink: &mut impl Sink<E::Effect>,
127        commands: &[impl Command],
128    ) -> Result<usize, ClientError> {
129        let count = trx.add_commands(commands, &mut self.provider, &mut self.engine, sink)?;
130        Ok(count)
131    }
132
133    pub fn update_heads(
134        &mut self,
135        storage_id: GraphId,
136        addrs: impl IntoIterator<Item = Address>,
137        request_heads: &mut PeerCache,
138    ) -> Result<(), ClientError> {
139        let storage = self.provider.get_storage(storage_id)?;
140        for address in addrs {
141            if let Some(loc) = storage.get_location(address)? {
142                request_heads.add_command(storage, address, loc)?;
143            }
144        }
145        Ok(())
146    }
147
148    /// Returns the address of the head of the graph.
149    pub fn head_address(&mut self, storage_id: GraphId) -> Result<Address, ClientError> {
150        let storage = self.provider.get_storage(storage_id)?;
151        let address = storage.get_head_address()?;
152        Ok(address)
153    }
154
155    /// Performs an `action`, writing the results to `sink`.
156    pub fn action(
157        &mut self,
158        storage_id: GraphId,
159        sink: &mut impl Sink<E::Effect>,
160        action: <E::Policy as Policy>::Action<'_>,
161    ) -> Result<(), ClientError> {
162        let storage = self.provider.get_storage(storage_id)?;
163
164        let head = storage.get_head()?;
165
166        let mut perspective = storage.get_linear_perspective(head)?;
167
168        let policy_id = perspective.policy();
169        let policy = self.engine.get_policy(policy_id)?;
170
171        // No need to checkpoint the perspective since it is only for this action.
172        // Must checkpoint once we add action transactions.
173
174        sink.begin();
175        match policy.call_action(action, &mut perspective, sink, ActionPlacement::OnGraph) {
176            Ok(()) => {
177                let segment = storage.write(perspective)?;
178                storage.commit(segment)?;
179                sink.commit();
180                Ok(())
181            }
182            Err(e) => {
183                sink.rollback();
184                Err(e.into())
185            }
186        }
187    }
188}
189
190impl<E, SP> ClientState<E, SP>
191where
192    SP: StorageProvider,
193{
194    /// Create a new [`Transaction`], used to receive [`Command`]s when syncing.
195    pub fn transaction(&mut self, storage_id: GraphId) -> Transaction<SP, E> {
196        Transaction::new(storage_id)
197    }
198
199    /// Create an ephemeral [`Session`] associated with this client.
200    pub fn session(&mut self, storage_id: GraphId) -> Result<Session<SP, E>, ClientError> {
201        Session::new(&mut self.provider, storage_id)
202    }
203
204    /// Checks if a command with the given address exists in the specified graph.
205    ///
206    /// Returns `true` if the command exists, `false` if it doesn't exist or the graph doesn't exist.
207    /// This method is used to determine if we need to sync when a hello message is received.
208    pub fn command_exists(&mut self, storage_id: GraphId, address: Address) -> bool {
209        let Ok(storage) = self.provider.get_storage(storage_id) else {
210            // Graph doesn't exist
211            return false;
212        };
213        storage.get_location(address).unwrap_or(None).is_some()
214    }
215}