1use core::iter::DoubleEndedIterator;
2
3use buggy::Bug;
4use tracing::error;
5
6use crate::{
7 Address, CmdId, Command, Engine, EngineError, GraphId, PeerCache, Perspective as _, Policy,
8 Sink, Storage as _, StorageError, StorageProvider, engine::ActionPlacement,
9};
10
11mod braiding;
12mod session;
13mod transaction;
14
15pub use self::{session::Session, transaction::Transaction};
16
17#[derive(Debug, thiserror::Error)]
19pub enum ClientError {
20 #[error("no such parent: {0}")]
21 NoSuchParent(CmdId),
22 #[error("engine error: {0}")]
23 EngineError(EngineError),
24 #[error("storage error: {0}")]
25 StorageError(#[from] StorageError),
26 #[error("init error")]
27 InitError,
28 #[error("not authorized")]
29 NotAuthorized,
30 #[error("session deserialize error: {0}")]
31 SessionDeserialize(#[from] postcard::Error),
32 #[error("found parallel finalize commands during braid")]
40 ParallelFinalize,
41 #[error(transparent)]
42 Bug(#[from] Bug),
43}
44
45impl From<EngineError> for ClientError {
46 fn from(error: EngineError) -> Self {
47 match error {
48 EngineError::Check => Self::NotAuthorized,
49 _ => Self::EngineError(error),
50 }
51 }
52}
53
54#[derive(Debug)]
59pub struct ClientState<E, SP> {
60 engine: E,
61 provider: SP,
62}
63
64impl<E, SP> ClientState<E, SP> {
65 pub const fn new(engine: E, provider: SP) -> Self {
67 Self { engine, provider }
68 }
69
70 pub fn provider(&mut self) -> &mut SP {
72 &mut self.provider
73 }
74}
75
76impl<E, SP> ClientState<E, SP>
77where
78 E: Engine,
79 SP: StorageProvider,
80{
81 pub fn new_graph(
86 &mut self,
87 policy_data: &[u8],
88 action: <E::Policy as Policy>::Action<'_>,
89 sink: &mut impl Sink<E::Effect>,
90 ) -> Result<GraphId, ClientError> {
91 let policy_id = self.engine.add_policy(policy_data)?;
92 let policy = self.engine.get_policy(policy_id)?;
93
94 let mut perspective = self.provider.new_perspective(policy_id);
95 sink.begin();
96 policy
97 .call_action(action, &mut perspective, sink, ActionPlacement::OnGraph)
98 .inspect_err(|_| sink.rollback())?;
99 sink.commit();
100
101 let (graph_id, _) = self.provider.new_storage(perspective)?;
102
103 Ok(graph_id)
104 }
105
106 pub fn remove_graph(&mut self, graph_id: GraphId) -> Result<(), ClientError> {
108 self.provider.remove_storage(graph_id)?;
109
110 Ok(())
111 }
112
113 pub fn commit(
115 &mut self,
116 trx: &mut Transaction<SP, E>,
117 sink: &mut impl Sink<E::Effect>,
118 ) -> Result<(), ClientError> {
119 trx.commit(&mut self.provider, &mut self.engine, sink)?;
120 Ok(())
121 }
122
123 pub fn add_commands(
127 &mut self,
128 trx: &mut Transaction<SP, E>,
129 sink: &mut impl Sink<E::Effect>,
130 commands: &[impl Command],
131 ) -> Result<usize, ClientError> {
132 trx.add_commands(commands, &mut self.provider, &mut self.engine, sink)
133 }
134
135 pub fn update_heads<I>(
136 &mut self,
137 storage_id: GraphId,
138 addrs: I,
139 request_heads: &mut PeerCache,
140 ) -> Result<(), ClientError>
141 where
142 I: IntoIterator<Item = Address>,
143 I::IntoIter: DoubleEndedIterator,
144 {
145 let storage = self.provider.get_storage(storage_id)?;
146
147 for address in addrs.into_iter().rev() {
151 if let Some(loc) = storage.get_location(address)? {
152 request_heads.add_command(storage, address, loc)?;
153 } else {
154 error!(
155 "UPDATE_HEADS: Address {:?} does NOT exist in storage, skipping (should not happen if command was successfully added)",
156 address
157 );
158 }
159 }
160
161 Ok(())
162 }
163
164 pub fn head_address(&mut self, storage_id: GraphId) -> Result<Address, ClientError> {
166 let storage = self.provider.get_storage(storage_id)?;
167 let address = storage.get_head_address()?;
168 Ok(address)
169 }
170
171 pub fn action(
173 &mut self,
174 storage_id: GraphId,
175 sink: &mut impl Sink<E::Effect>,
176 action: <E::Policy as Policy>::Action<'_>,
177 ) -> Result<(), ClientError> {
178 let storage = self.provider.get_storage(storage_id)?;
179
180 let head = storage.get_head()?;
181
182 let mut perspective = storage.get_linear_perspective(head)?;
183
184 let policy_id = perspective.policy();
185 let policy = self.engine.get_policy(policy_id)?;
186
187 sink.begin();
191 match policy.call_action(action, &mut perspective, sink, ActionPlacement::OnGraph) {
192 Ok(()) => {
193 let segment = storage.write(perspective)?;
194 storage.commit(segment)?;
195 sink.commit();
196 Ok(())
197 }
198 Err(e) => {
199 sink.rollback();
200 Err(e.into())
201 }
202 }
203 }
204}
205
206impl<E, SP> ClientState<E, SP>
207where
208 SP: StorageProvider,
209{
210 pub fn transaction(&mut self, storage_id: GraphId) -> Transaction<SP, E> {
212 Transaction::new(storage_id)
213 }
214
215 pub fn session(&mut self, storage_id: GraphId) -> Result<Session<SP, E>, ClientError> {
217 Session::new(&mut self.provider, storage_id)
218 }
219
220 pub fn command_exists(&mut self, storage_id: GraphId, address: Address) -> bool {
225 let Ok(storage) = self.provider.get_storage(storage_id) else {
226 return false;
228 };
229 storage.get_location(address).unwrap_or(None).is_some()
230 }
231}