1use core::iter::DoubleEndedIterator;
2
3use buggy::Bug;
4use tracing::error;
5
6use crate::{
7 Address, CmdId, Command, GraphId, PeerCache, Perspective as _, Policy, PolicyError,
8 PolicyStore, Sink, Storage as _, StorageError, StorageProvider, policy::ActionPlacement,
9};
10
11mod braiding;
12mod session;
13mod transaction;
14
15pub use self::{session::Session, transaction::Transaction};
16
17#[derive(Debug, thiserror::Error)]
19pub enum ClientError {
20 #[error("no such parent: {0}")]
21 NoSuchParent(CmdId),
22 #[error("policy error: {0}")]
23 PolicyError(PolicyError),
24 #[error("storage error: {0}")]
25 StorageError(#[from] StorageError),
26 #[error("init error")]
27 InitError,
28 #[error("not authorized")]
29 NotAuthorized,
30 #[error("session deserialize error: {0}")]
31 SessionDeserialize(#[from] postcard::Error),
32 #[error("found parallel finalize commands during braid")]
40 ParallelFinalize,
41 #[error(transparent)]
42 Bug(#[from] Bug),
43}
44
45impl From<PolicyError> for ClientError {
46 fn from(error: PolicyError) -> Self {
47 match error {
48 PolicyError::Check => Self::NotAuthorized,
49 _ => Self::PolicyError(error),
50 }
51 }
52}
53
54#[derive(Debug)]
59pub struct ClientState<PS, SP> {
60 policy_store: PS,
61 provider: SP,
62}
63
64impl<PS, SP> ClientState<PS, SP> {
65 pub const fn new(policy_store: PS, provider: SP) -> Self {
67 Self {
68 policy_store,
69 provider,
70 }
71 }
72
73 pub fn provider(&mut self) -> &mut SP {
75 &mut self.provider
76 }
77}
78
79impl<PS, SP> ClientState<PS, SP>
80where
81 PS: PolicyStore,
82 SP: StorageProvider,
83{
84 pub fn new_graph(
89 &mut self,
90 policy_data: &[u8],
91 action: <PS::Policy as Policy>::Action<'_>,
92 sink: &mut impl Sink<PS::Effect>,
93 ) -> Result<GraphId, ClientError> {
94 let policy_id = self.policy_store.add_policy(policy_data)?;
95 let policy = self.policy_store.get_policy(policy_id)?;
96
97 let mut perspective = self.provider.new_perspective(policy_id);
98 sink.begin();
99 policy
100 .call_action(action, &mut perspective, sink, ActionPlacement::OnGraph)
101 .inspect_err(|_| sink.rollback())?;
102 sink.commit();
103
104 let (graph_id, _) = self.provider.new_storage(perspective)?;
105
106 Ok(graph_id)
107 }
108
109 pub fn remove_graph(&mut self, graph_id: GraphId) -> Result<(), ClientError> {
111 self.provider.remove_storage(graph_id)?;
112
113 Ok(())
114 }
115
116 pub fn commit(
118 &mut self,
119 trx: &mut Transaction<SP, PS>,
120 sink: &mut impl Sink<PS::Effect>,
121 ) -> Result<(), ClientError> {
122 trx.commit(&mut self.provider, &mut self.policy_store, sink)?;
123 Ok(())
124 }
125
126 pub fn add_commands(
130 &mut self,
131 trx: &mut Transaction<SP, PS>,
132 sink: &mut impl Sink<PS::Effect>,
133 commands: &[impl Command],
134 ) -> Result<usize, ClientError> {
135 trx.add_commands(commands, &mut self.provider, &mut self.policy_store, sink)
136 }
137
138 pub fn update_heads<I>(
139 &mut self,
140 graph_id: GraphId,
141 addrs: I,
142 request_heads: &mut PeerCache,
143 ) -> Result<(), ClientError>
144 where
145 I: IntoIterator<Item = Address>,
146 I::IntoIter: DoubleEndedIterator,
147 {
148 let storage = self.provider.get_storage(graph_id)?;
149
150 for address in addrs.into_iter().rev() {
154 if let Some(loc) = storage.get_location(address)? {
155 request_heads.add_command(storage, address, loc)?;
156 } else {
157 error!(
158 "UPDATE_HEADS: Address {:?} does NOT exist in storage, skipping (should not happen if command was successfully added)",
159 address
160 );
161 }
162 }
163
164 Ok(())
165 }
166
167 pub fn head_address(&mut self, graph_id: GraphId) -> Result<Address, ClientError> {
169 let storage = self.provider.get_storage(graph_id)?;
170 let address = storage.get_head_address()?;
171 Ok(address)
172 }
173
174 pub fn action(
176 &mut self,
177 graph_id: GraphId,
178 sink: &mut impl Sink<PS::Effect>,
179 action: <PS::Policy as Policy>::Action<'_>,
180 ) -> Result<(), ClientError> {
181 let storage = self.provider.get_storage(graph_id)?;
182
183 let head = storage.get_head()?;
184
185 let mut perspective = storage.get_linear_perspective(head)?;
186
187 let policy_id = perspective.policy();
188 let policy = self.policy_store.get_policy(policy_id)?;
189
190 sink.begin();
194 match policy.call_action(action, &mut perspective, sink, ActionPlacement::OnGraph) {
195 Ok(()) => {
196 let segment = storage.write(perspective)?;
197 storage.commit(segment)?;
198 sink.commit();
199 Ok(())
200 }
201 Err(e) => {
202 sink.rollback();
203 Err(e.into())
204 }
205 }
206 }
207}
208
209impl<PS, SP> ClientState<PS, SP>
210where
211 SP: StorageProvider,
212{
213 pub fn transaction(&mut self, graph_id: GraphId) -> Transaction<SP, PS> {
215 Transaction::new(graph_id)
216 }
217
218 pub fn session(&mut self, graph_id: GraphId) -> Result<Session<SP, PS>, ClientError> {
220 Session::new(&mut self.provider, graph_id)
221 }
222
223 pub fn command_exists(&mut self, graph_id: GraphId, address: Address) -> bool {
228 let Ok(storage) = self.provider.get_storage(graph_id) else {
229 return false;
231 };
232 storage.get_location(address).unwrap_or(None).is_some()
233 }
234}