Skip to main content

aranya_runtime/
client.rs

1use core::iter::DoubleEndedIterator;
2
3use buggy::Bug;
4use tracing::error;
5
6use crate::{
7    Address, CmdId, Command, GraphId, PeerCache, Perspective as _, Policy, PolicyError,
8    PolicyStore, Sink, Storage as _, StorageError, StorageProvider, policy::ActionPlacement,
9};
10
11mod braiding;
12mod session;
13mod transaction;
14
15pub use self::{session::Session, transaction::Transaction};
16
17/// An error returned by the runtime client.
18#[derive(Debug, thiserror::Error)]
19pub enum ClientError {
20    #[error("no such parent: {0}")]
21    NoSuchParent(CmdId),
22    #[error("policy error: {0}")]
23    PolicyError(PolicyError),
24    #[error("storage error: {0}")]
25    StorageError(#[from] StorageError),
26    #[error("init error")]
27    InitError,
28    #[error("not authorized")]
29    NotAuthorized,
30    #[error("session deserialize error: {0}")]
31    SessionDeserialize(#[from] postcard::Error),
32    /// Attempted to braid two parallel finalize commands together.
33    ///
34    /// Policy must be designed such that two parallel finalize commands are never produced.
35    ///
36    /// Currently, this is practically an unrecoverable error. You must wipe all graphs containing
37    /// the "bad" finalize command and resync from the "good" clients. Otherwise, your network will
38    /// split into two separate graph states which can never successfully sync.
39    #[error("found parallel finalize commands during braid")]
40    ParallelFinalize,
41    #[error(transparent)]
42    Bug(#[from] Bug),
43}
44
45impl From<PolicyError> for ClientError {
46    fn from(error: PolicyError) -> Self {
47        match error {
48            PolicyError::Check => Self::NotAuthorized,
49            _ => Self::PolicyError(error),
50        }
51    }
52}
53
54/// Keeps track of client graph state.
55///
56/// - `PS` should be an implementation of [`PolicyStore`].
57/// - `SP` should be an implementation of [`StorageProvider`].
58#[derive(Debug)]
59pub struct ClientState<PS, SP> {
60    policy_store: PS,
61    provider: SP,
62}
63
64impl<PS, SP> ClientState<PS, SP> {
65    /// Creates a `ClientState`.
66    pub const fn new(policy_store: PS, provider: SP) -> Self {
67        Self {
68            policy_store,
69            provider,
70        }
71    }
72
73    /// Provide access to the [`StorageProvider`].
74    pub fn provider(&mut self) -> &mut SP {
75        &mut self.provider
76    }
77}
78
79impl<PS, SP> ClientState<PS, SP>
80where
81    PS: PolicyStore,
82    SP: StorageProvider,
83{
84    /// Create a new graph (AKA Team). This graph will start with the initial policy
85    /// provided which must be compatible with the policy store PS. The `payload` is the initial
86    /// init message that will bootstrap the graph facts. Effects produced when processing
87    /// the payload are emitted to the sink.
88    pub fn new_graph(
89        &mut self,
90        policy_data: &[u8],
91        action: <PS::Policy as Policy>::Action<'_>,
92        sink: &mut impl Sink<PS::Effect>,
93    ) -> Result<GraphId, ClientError> {
94        let policy_id = self.policy_store.add_policy(policy_data)?;
95        let policy = self.policy_store.get_policy(policy_id)?;
96
97        let mut perspective = self.provider.new_perspective(policy_id);
98        sink.begin();
99        policy
100            .call_action(action, &mut perspective, sink, ActionPlacement::OnGraph)
101            .inspect_err(|_| sink.rollback())?;
102        sink.commit();
103
104        let (graph_id, _) = self.provider.new_storage(perspective)?;
105
106        Ok(graph_id)
107    }
108
109    /// Remove a graph (AKA Team). The graph commands will be removed from storage.
110    pub fn remove_graph(&mut self, graph_id: GraphId) -> Result<(), ClientError> {
111        self.provider.remove_storage(graph_id)?;
112
113        Ok(())
114    }
115
116    /// Commit the [`Transaction`] to storage, after merging all temporary heads.
117    pub fn commit(
118        &mut self,
119        trx: &mut Transaction<SP, PS>,
120        sink: &mut impl Sink<PS::Effect>,
121    ) -> Result<(), ClientError> {
122        trx.commit(&mut self.provider, &mut self.policy_store, sink)?;
123        Ok(())
124    }
125
126    /// Add commands to the transaction, writing the results to
127    /// `sink`.
128    /// Returns the number of commands that were added.
129    pub fn add_commands(
130        &mut self,
131        trx: &mut Transaction<SP, PS>,
132        sink: &mut impl Sink<PS::Effect>,
133        commands: &[impl Command],
134    ) -> Result<usize, ClientError> {
135        trx.add_commands(commands, &mut self.provider, &mut self.policy_store, sink)
136    }
137
138    pub fn update_heads<I>(
139        &mut self,
140        graph_id: GraphId,
141        addrs: I,
142        request_heads: &mut PeerCache,
143    ) -> Result<(), ClientError>
144    where
145        I: IntoIterator<Item = Address>,
146        I::IntoIter: DoubleEndedIterator,
147    {
148        let storage = self.provider.get_storage(graph_id)?;
149
150        // Commands in sync messages are always ancestor-first (lower max_cut to higher max_cut).
151        // Reverse the iterator to process highest max_cut first, which allows us to skip ancestors
152        // since if a command is an ancestor of one we've already added, we don't need to add it.
153        for address in addrs.into_iter().rev() {
154            if let Some(loc) = storage.get_location(address)? {
155                request_heads.add_command(storage, address, loc)?;
156            } else {
157                error!(
158                    "UPDATE_HEADS: Address {:?} does NOT exist in storage, skipping (should not happen if command was successfully added)",
159                    address
160                );
161            }
162        }
163
164        Ok(())
165    }
166
167    /// Returns the address of the head of the graph.
168    pub fn head_address(&mut self, graph_id: GraphId) -> Result<Address, ClientError> {
169        let storage = self.provider.get_storage(graph_id)?;
170        let address = storage.get_head_address()?;
171        Ok(address)
172    }
173
174    /// Performs an `action`, writing the results to `sink`.
175    pub fn action(
176        &mut self,
177        graph_id: GraphId,
178        sink: &mut impl Sink<PS::Effect>,
179        action: <PS::Policy as Policy>::Action<'_>,
180    ) -> Result<(), ClientError> {
181        let storage = self.provider.get_storage(graph_id)?;
182
183        let head = storage.get_head()?;
184
185        let mut perspective = storage.get_linear_perspective(head)?;
186
187        let policy_id = perspective.policy();
188        let policy = self.policy_store.get_policy(policy_id)?;
189
190        // No need to checkpoint the perspective since it is only for this action.
191        // Must checkpoint once we add action transactions.
192
193        sink.begin();
194        match policy.call_action(action, &mut perspective, sink, ActionPlacement::OnGraph) {
195            Ok(()) => {
196                let segment = storage.write(perspective)?;
197                storage.commit(segment)?;
198                sink.commit();
199                Ok(())
200            }
201            Err(e) => {
202                sink.rollback();
203                Err(e.into())
204            }
205        }
206    }
207}
208
209impl<PS, SP> ClientState<PS, SP>
210where
211    SP: StorageProvider,
212{
213    /// Create a new [`Transaction`], used to receive [`Command`]s when syncing.
214    pub fn transaction(&mut self, graph_id: GraphId) -> Transaction<SP, PS> {
215        Transaction::new(graph_id)
216    }
217
218    /// Create an ephemeral [`Session`] associated with this client.
219    pub fn session(&mut self, graph_id: GraphId) -> Result<Session<SP, PS>, ClientError> {
220        Session::new(&mut self.provider, graph_id)
221    }
222
223    /// Checks if a command with the given address exists in the specified graph.
224    ///
225    /// Returns `true` if the command exists, `false` if it doesn't exist or the graph doesn't exist.
226    /// This method is used to determine if we need to sync when a hello message is received.
227    pub fn command_exists(&mut self, graph_id: GraphId, address: Address) -> bool {
228        let Ok(storage) = self.provider.get_storage(graph_id) else {
229            // Graph doesn't exist
230            return false;
231        };
232        storage.get_location(address).unwrap_or(None).is_some()
233    }
234}