1use std::collections::HashMap;
2use std::ops::Deref;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use freqfs::DirLock;
8use futures::Future;
9use rjwt::{Token, VerifyingKey};
10use safecast::CastInto;
11use umask::Mode;
12use uuid::Uuid;
13
14use tc_error::*;
15use tc_state::CacheBlock;
16use tc_transact::Gateway;
17use tc_value::{Host, Link, ToUrl, Value};
18use tcgeneric::{label, Id, Label, Map, NetworkTime, PathSegment, TCPathBuf};
19
20use crate::claim::Claim;
21use crate::client::{Client, Egress};
22use crate::{Actor, RPCClient, SignedToken, State};
23
24pub use hypothetical::Hypothetical;
25pub use server::TxnServer;
26pub use tc_transact::{IntoView, Transaction, TxnId};
27
28mod hypothetical;
29mod server;
30
31const PREFIX: Label = label("txn");
32
33pub(super) enum LazyDir {
34 Workspace(DirLock<CacheBlock>),
35 Lazy(Arc<Self>, Id),
36}
37
38impl Clone for LazyDir {
39 fn clone(&self) -> Self {
40 match self {
41 Self::Workspace(workspace) => Self::Workspace(workspace.clone()),
42 Self::Lazy(parent, id) => Self::Lazy(parent.clone(), id.clone()),
43 }
44 }
45}
46
47impl LazyDir {
48 fn get_or_create<'a>(
49 &'a self,
50 txn_id: &'a TxnId,
51 ) -> Pin<Box<dyn Future<Output = TCResult<DirLock<CacheBlock>>> + Send + 'a>> {
52 Box::pin(async move {
53 match self {
54 Self::Workspace(workspace) => {
55 let mut parent = workspace.write().await;
56
57 parent
58 .get_or_create_dir(txn_id.to_string())
59 .map_err(TCError::from)
60 }
61 Self::Lazy(parent, name) => {
62 let parent = parent.get_or_create(txn_id).await?;
63 let mut parent = parent.write().await;
64
65 parent
66 .get_or_create_dir(name.to_string())
67 .map_err(TCError::from)
68 }
69 }
70 })
71 }
72
73 fn create_dir(self, name: Id) -> Self {
74 Self::Lazy(Arc::new(self), name)
75 }
76
77 fn create_dir_unique(self) -> Self {
78 Self::Lazy(Arc::new(self), Uuid::new_v4().into())
79 }
80}
81
82impl From<DirLock<CacheBlock>> for LazyDir {
83 fn from(dir: DirLock<CacheBlock>) -> Self {
84 Self::Workspace(dir)
85 }
86}
87
88pub struct Txn {
89 id: TxnId,
90 expires: NetworkTime,
91 workspace: LazyDir,
92 token: Option<Arc<SignedToken>>,
93 client: Client,
94}
95
96impl Clone for Txn {
97 fn clone(&self) -> Self {
98 Self {
99 id: self.id,
100 expires: self.expires,
101 workspace: self.workspace.clone(),
102 token: self.token.clone(),
103 client: self.client.clone(),
104 }
105 }
106}
107
108impl Txn {
109 pub const DEFAULT_MODE: Mode = Mode::new().with_class_perm(umask::OTHERS, umask::ALL);
110
111 #[inline]
112 fn validate_token(id: TxnId, token: &SignedToken) -> TCResult<()> {
113 let mut owner = None;
114 let mut lock = None;
115
116 for (host, actor_id, claim) in token.claims() {
117 if host.path().is_empty() {
118 return Err(bad_request!(
119 "invalid token: cannot claim reserved path {}",
120 claim.path()
121 ));
122 } else if !claim.path().is_empty() && claim.path()[0] == PREFIX {
123 if claim.path().len() == 2 && id == claim.path()[1] {
124 if claim.mode().has(umask::USER_EXEC) {
125 if owner.is_none() {
126 owner = Some((host, actor_id));
127 } else {
128 return Err(bad_request!("invalid token: multiple owners"));
129 }
130 }
131
132 if claim.mode().has(umask::USER_WRITE) {
133 if lock.is_none() {
134 lock = Some((host, actor_id));
135 } else {
136 return Err(bad_request!("invalid token: multiple locks"));
137 }
138 }
139 } else {
140 return Err(bad_request!(
141 "cannot initialize txn {id} with a token for {}",
142 claim.path()[1]
143 ));
144 }
145 }
146 }
147
148 if let Some(lock) = lock {
149 if let Some(owner) = owner {
150 if owner == lock {
151 } else {
153 return Err(bad_request!("invalid token: lock does not match owner"));
154 }
155 } else {
156 return Err(bad_request!("invalid token: lock with no owner"));
157 }
158 }
159
160 Ok(())
161 }
162
163 pub(super) fn new(
164 id: TxnId,
165 expires: NetworkTime,
166 workspace: LazyDir,
167 client: Client,
168 token: Option<SignedToken>,
169 ) -> TCResult<Self> {
170 if let Some(token) = &token {
171 Self::validate_token(id, token)?;
172 }
173
174 Ok(Self {
175 id,
176 expires,
177 workspace,
178 client,
179 token: token.map(Arc::new),
180 })
181 }
182
183 #[cfg(feature = "service")]
184 pub(crate) fn host(&self) -> &Host {
185 self.client.host()
186 }
187
188 pub(crate) fn token(&self) -> Option<&SignedToken> {
189 self.token.as_ref().map(|token| &**token)
190 }
191
192 pub fn grant(&self, actor: &Actor, link: Link, path: TCPathBuf, mode: Mode) -> TCResult<Self> {
195 #[cfg(debug_assertions)]
196 {
197 let expected = Value::Bytes((*actor.public_key().as_bytes()).into());
198 log::debug!("grant {mode} on {path} to {actor:?} at {link} with public key {expected}");
199 }
200
201 let now = NetworkTime::now();
202 let claim = Claim::new(path, mode);
203
204 let token = if let Some(token) = &self.token {
205 actor.consume_and_sign((**token).clone(), link, claim, now.into())
206 } else {
207 let ttl = self.expires - now;
208 let token = Token::new(link, now.into(), ttl, actor.id().clone(), claim);
209 actor.sign_token(token)
210 }?;
211
212 Ok(Self {
213 id: self.id,
214 expires: self.expires,
215 workspace: self.workspace.clone(),
216 client: self.client.clone(),
217 token: Some(Arc::new(token)),
218 })
219 }
220
221 pub fn mode<Keyring>(&self, _keyring: Keyring, _resource: &[PathSegment]) -> Mode
223 where
224 Keyring: Deref<Target = HashMap<Host, VerifyingKey>>,
225 {
226 let mode = Self::DEFAULT_MODE;
227
228 mode
231 }
232
233 pub fn locked_by(&self) -> TCResult<Option<VerifyingKey>> {
235 let token = if let Some(token) = &self.token {
236 token
237 } else {
238 return Ok(None);
239 };
240
241 let mut claims = token.claims().into_iter();
242
243 let mut locked_by = None;
244 while let Some((_host, actor_id, claim)) = claims.next() {
245 if self.is_txn_path(claim.path()) {
246 if claim.mode().has(umask::USER_WRITE) {
247 locked_by = Some(actor_id);
248 break;
249 }
250 }
251 }
252
253 let mut owned_by = None;
254 while let Some((_host, actor_id, claim)) = claims.next() {
255 if self.is_txn_path(claim.path()) {
256 if claim.mode().has(umask::USER_EXEC) {
257 owned_by = Some(actor_id);
258 }
259 }
260 }
261
262 if let Some(locked_by) = locked_by {
263 let owner = owned_by.ok_or_else(|| internal!("ownerless tranaction"))?;
264
265 if locked_by == owner {
266 let locked_by = Arc::<[u8]>::try_from(locked_by.clone())?;
267
268 VerifyingKey::try_from(&*locked_by)
269 .map_err(|cause| bad_request!("invalid public key for txn leader: {cause}"))
270 .map(Some)
271 } else {
272 Err(internal!("txn locked by non-owner"))
273 }
274 } else {
275 Ok(None)
276 }
277 }
278
279 pub fn claim(self, link: Link, actor: &Actor) -> TCResult<Self> {
280 debug_assert!(self.leader(link.path()).expect("leader").is_none());
281
282 let txn = if self.owner()?.is_none() {
283 self.grant(actor, link.clone(), self.txn_path(), umask::USER_EXEC)?
284 } else {
285 self
286 };
287
288 txn.grant(actor, link, TCPathBuf::default(), umask::USER_EXEC)
289 }
290
291 pub fn lock(self, actor: &Actor) -> TCResult<Self> {
292 let mut owner = None;
293 if let Some(token) = &self.token {
294 for (host, actor_id, claim) in token.claims() {
295 if self.is_txn_path(claim.path()) && claim.mode().has(umask::USER_EXEC) {
296 owner = Some((host, actor_id));
297 }
298 }
299 }
300
301 let link = if let Some((link, public_key)) = owner {
302 assert_eq!(
303 *public_key,
304 Value::Bytes((*actor.public_key().as_bytes()).into())
305 );
306
307 link.clone()
308 } else {
309 panic!("tried to lock a transaction with no owner")
310 };
311
312 self.grant(actor, link, self.txn_path(), umask::USER_WRITE)
313 }
314
315 pub fn has_claims(&self) -> bool {
316 self.token.is_some()
317 }
318
319 pub fn leader(&self, path: &[PathSegment]) -> TCResult<Option<(Link, VerifyingKey)>> {
320 if let Some(token) = &self.token {
321 for (host, actor_id, claim) in token.claims() {
322 if host.path() == path
323 && claim.path().is_empty()
324 && claim.mode().has(umask::USER_EXEC)
325 {
326 let public_key = Arc::<[u8]>::try_from(actor_id.clone())?;
327 let public_key = VerifyingKey::try_from(&*public_key)
328 .map_err(|cause| bad_request!("invalid leader key: {cause}"))?;
329
330 return Ok(Some((host.clone(), public_key)));
331 }
332 }
333 }
334
335 Ok(None)
336 }
337
338 pub fn owner(&self) -> TCResult<Option<VerifyingKey>> {
339 let mut owner = None;
340
341 if let Some(token) = &self.token {
342 for (_host, actor_id, claim) in token.claims() {
343 if self.is_txn_path(claim.path()) && claim.mode().has(umask::USER_EXEC) {
344 owner = Some(actor_id);
345 }
346 }
347 }
348
349 if let Some(owner) = owner {
350 let public_key = Arc::<[u8]>::try_from(owner.clone())?;
351 let public_key = VerifyingKey::try_from(&*public_key)
352 .map_err(|cause| bad_request!("invalid owner key: {cause}"))?;
353
354 Ok(Some(public_key))
355 } else {
356 Ok(None)
357 }
358 }
359
360 pub(crate) fn with_egress(self, egress: Arc<dyn Egress>) -> Self {
361 Self {
362 id: self.id,
363 expires: self.expires,
364 workspace: self.workspace,
365 client: self.client.with_egress(egress),
366 token: self.token,
367 }
368 }
369
370 #[inline]
371 fn txn_path(&self) -> TCPathBuf {
372 [PREFIX.into(), self.id.to_id()].into()
373 }
374
375 #[inline]
376 fn is_txn_path(&self, path: &[PathSegment]) -> bool {
377 path.len() == 2 && PREFIX == path[0] && self.id == path[1]
378 }
379}
380
381#[async_trait]
382impl Transaction<CacheBlock> for Txn {
383 #[inline]
384 fn id(&self) -> &TxnId {
385 &self.id
386 }
387
388 async fn context(&self) -> TCResult<DirLock<CacheBlock>> {
389 self.workspace.get_or_create(&self.id).await
390 }
391
392 fn subcontext<I: Into<Id> + Send>(&self, id: I) -> Self {
393 Txn {
394 id: self.id,
395 expires: self.expires,
396 workspace: self.workspace.clone().create_dir(id.into()),
397 client: self.client.clone(),
398 token: self.token.clone(),
399 }
400 }
401
402 fn subcontext_unique(&self) -> Self {
403 Txn {
404 id: self.id,
405 expires: self.expires,
406 workspace: self.workspace.clone().create_dir_unique(),
407 client: self.client.clone(),
408 token: self.token.clone(),
409 }
410 }
411}
412
413#[async_trait]
414impl Gateway<State> for Txn {
415 async fn get<'a, L, V>(&'a self, link: L, key: V) -> TCResult<State>
416 where
417 L: Into<ToUrl<'a>> + Send,
418 V: CastInto<Value> + Send,
419 {
420 self.client.get(self, link.into(), key.cast_into()).await
421 }
422
423 async fn put<'a, L, K, V>(&'a self, link: L, key: K, value: V) -> TCResult<()>
424 where
425 L: Into<ToUrl<'a>> + Send,
426 K: CastInto<Value> + Send,
427 V: CastInto<State> + Send,
428 {
429 self.client
430 .put(self, link.into(), key.cast_into(), value.cast_into())
431 .await
432 }
433
434 async fn post<'a, L, P>(&'a self, link: L, params: P) -> TCResult<State>
435 where
436 L: Into<ToUrl<'a>> + Send,
437 P: CastInto<Map<State>> + Send,
438 {
439 self.client
440 .post(self, link.into(), params.cast_into())
441 .await
442 }
443
444 async fn delete<'a, L, V>(&'a self, link: L, key: V) -> TCResult<()>
445 where
446 L: Into<ToUrl<'a>> + Send,
447 V: CastInto<Value> + Send,
448 {
449 self.client.delete(self, link.into(), key.cast_into()).await
450 }
451}