tc_server/txn/
mod.rs

1use std::collections::HashMap;
2use std::ops::Deref;
3use std::pin::Pin;
4use std::sync::Arc;
5
6use async_trait::async_trait;
7use freqfs::DirLock;
8use futures::Future;
9use rjwt::{Token, VerifyingKey};
10use safecast::CastInto;
11use umask::Mode;
12use uuid::Uuid;
13
14use tc_error::*;
15use tc_state::CacheBlock;
16use tc_transact::Gateway;
17use tc_value::{Host, Link, ToUrl, Value};
18use tcgeneric::{label, Id, Label, Map, NetworkTime, PathSegment, TCPathBuf};
19
20use crate::claim::Claim;
21use crate::client::{Client, Egress};
22use crate::{Actor, RPCClient, SignedToken, State};
23
24pub use hypothetical::Hypothetical;
25pub use server::TxnServer;
26pub use tc_transact::{IntoView, Transaction, TxnId};
27
28mod hypothetical;
29mod server;
30
31const PREFIX: Label = label("txn");
32
33pub(super) enum LazyDir {
34    Workspace(DirLock<CacheBlock>),
35    Lazy(Arc<Self>, Id),
36}
37
38impl Clone for LazyDir {
39    fn clone(&self) -> Self {
40        match self {
41            Self::Workspace(workspace) => Self::Workspace(workspace.clone()),
42            Self::Lazy(parent, id) => Self::Lazy(parent.clone(), id.clone()),
43        }
44    }
45}
46
47impl LazyDir {
48    fn get_or_create<'a>(
49        &'a self,
50        txn_id: &'a TxnId,
51    ) -> Pin<Box<dyn Future<Output = TCResult<DirLock<CacheBlock>>> + Send + 'a>> {
52        Box::pin(async move {
53            match self {
54                Self::Workspace(workspace) => {
55                    let mut parent = workspace.write().await;
56
57                    parent
58                        .get_or_create_dir(txn_id.to_string())
59                        .map_err(TCError::from)
60                }
61                Self::Lazy(parent, name) => {
62                    let parent = parent.get_or_create(txn_id).await?;
63                    let mut parent = parent.write().await;
64
65                    parent
66                        .get_or_create_dir(name.to_string())
67                        .map_err(TCError::from)
68                }
69            }
70        })
71    }
72
73    fn create_dir(self, name: Id) -> Self {
74        Self::Lazy(Arc::new(self), name)
75    }
76
77    fn create_dir_unique(self) -> Self {
78        Self::Lazy(Arc::new(self), Uuid::new_v4().into())
79    }
80}
81
82impl From<DirLock<CacheBlock>> for LazyDir {
83    fn from(dir: DirLock<CacheBlock>) -> Self {
84        Self::Workspace(dir)
85    }
86}
87
88pub struct Txn {
89    id: TxnId,
90    expires: NetworkTime,
91    workspace: LazyDir,
92    token: Option<Arc<SignedToken>>,
93    client: Client,
94}
95
96impl Clone for Txn {
97    fn clone(&self) -> Self {
98        Self {
99            id: self.id,
100            expires: self.expires,
101            workspace: self.workspace.clone(),
102            token: self.token.clone(),
103            client: self.client.clone(),
104        }
105    }
106}
107
108impl Txn {
109    pub const DEFAULT_MODE: Mode = Mode::new().with_class_perm(umask::OTHERS, umask::ALL);
110
111    #[inline]
112    fn validate_token(id: TxnId, token: &SignedToken) -> TCResult<()> {
113        let mut owner = None;
114        let mut lock = None;
115
116        for (host, actor_id, claim) in token.claims() {
117            if host.path().is_empty() {
118                return Err(bad_request!(
119                    "invalid token: cannot claim reserved path {}",
120                    claim.path()
121                ));
122            } else if !claim.path().is_empty() && claim.path()[0] == PREFIX {
123                if claim.path().len() == 2 && id == claim.path()[1] {
124                    if claim.mode().has(umask::USER_EXEC) {
125                        if owner.is_none() {
126                            owner = Some((host, actor_id));
127                        } else {
128                            return Err(bad_request!("invalid token: multiple owners"));
129                        }
130                    }
131
132                    if claim.mode().has(umask::USER_WRITE) {
133                        if lock.is_none() {
134                            lock = Some((host, actor_id));
135                        } else {
136                            return Err(bad_request!("invalid token: multiple locks"));
137                        }
138                    }
139                } else {
140                    return Err(bad_request!(
141                        "cannot initialize txn {id} with a token for {}",
142                        claim.path()[1]
143                    ));
144                }
145            }
146        }
147
148        if let Some(lock) = lock {
149            if let Some(owner) = owner {
150                if owner == lock {
151                    // pass
152                } else {
153                    return Err(bad_request!("invalid token: lock does not match owner"));
154                }
155            } else {
156                return Err(bad_request!("invalid token: lock with no owner"));
157            }
158        }
159
160        Ok(())
161    }
162
163    pub(super) fn new(
164        id: TxnId,
165        expires: NetworkTime,
166        workspace: LazyDir,
167        client: Client,
168        token: Option<SignedToken>,
169    ) -> TCResult<Self> {
170        if let Some(token) = &token {
171            Self::validate_token(id, token)?;
172        }
173
174        Ok(Self {
175            id,
176            expires,
177            workspace,
178            client,
179            token: token.map(Arc::new),
180        })
181    }
182
183    #[cfg(feature = "service")]
184    pub(crate) fn host(&self) -> &Host {
185        self.client.host()
186    }
187
188    pub(crate) fn token(&self) -> Option<&SignedToken> {
189        self.token.as_ref().map(|token| &**token)
190    }
191
192    /// Grant `mode` permissions on the resource at `path` to the bearer of this [`Txn`]'s token.
193    /// `path` is relative to the cluster at `link` whose `actor` will sign the token.
194    pub fn grant(&self, actor: &Actor, link: Link, path: TCPathBuf, mode: Mode) -> TCResult<Self> {
195        #[cfg(debug_assertions)]
196        {
197            let expected = Value::Bytes((*actor.public_key().as_bytes()).into());
198            log::debug!("grant {mode} on {path} to {actor:?} at {link} with public key {expected}");
199        }
200
201        let now = NetworkTime::now();
202        let claim = Claim::new(path, mode);
203
204        let token = if let Some(token) = &self.token {
205            actor.consume_and_sign((**token).clone(), link, claim, now.into())
206        } else {
207            let ttl = self.expires - now;
208            let token = Token::new(link, now.into(), ttl, actor.id().clone(), claim);
209            actor.sign_token(token)
210        }?;
211
212        Ok(Self {
213            id: self.id,
214            expires: self.expires,
215            workspace: self.workspace.clone(),
216            client: self.client.clone(),
217            token: Some(Arc::new(token)),
218        })
219    }
220
221    /// Get the set of permissions authorized by hosts in the `keyring` for the given `resource`.
222    pub fn mode<Keyring>(&self, _keyring: Keyring, _resource: &[PathSegment]) -> Mode
223    where
224        Keyring: Deref<Target = HashMap<Host, VerifyingKey>>,
225    {
226        let mode = Self::DEFAULT_MODE;
227
228        // TODO: add user & group permissions
229
230        mode
231    }
232
233    // TODO: require a write bit for commits and a read bit for rollbacks
234    pub fn locked_by(&self) -> TCResult<Option<VerifyingKey>> {
235        let token = if let Some(token) = &self.token {
236            token
237        } else {
238            return Ok(None);
239        };
240
241        let mut claims = token.claims().into_iter();
242
243        let mut locked_by = None;
244        while let Some((_host, actor_id, claim)) = claims.next() {
245            if self.is_txn_path(claim.path()) {
246                if claim.mode().has(umask::USER_WRITE) {
247                    locked_by = Some(actor_id);
248                    break;
249                }
250            }
251        }
252
253        let mut owned_by = None;
254        while let Some((_host, actor_id, claim)) = claims.next() {
255            if self.is_txn_path(claim.path()) {
256                if claim.mode().has(umask::USER_EXEC) {
257                    owned_by = Some(actor_id);
258                }
259            }
260        }
261
262        if let Some(locked_by) = locked_by {
263            let owner = owned_by.ok_or_else(|| internal!("ownerless tranaction"))?;
264
265            if locked_by == owner {
266                let locked_by = Arc::<[u8]>::try_from(locked_by.clone())?;
267
268                VerifyingKey::try_from(&*locked_by)
269                    .map_err(|cause| bad_request!("invalid public key for txn leader: {cause}"))
270                    .map(Some)
271            } else {
272                Err(internal!("txn locked by non-owner"))
273            }
274        } else {
275            Ok(None)
276        }
277    }
278
279    pub fn claim(self, link: Link, actor: &Actor) -> TCResult<Self> {
280        debug_assert!(self.leader(link.path()).expect("leader").is_none());
281
282        let txn = if self.owner()?.is_none() {
283            self.grant(actor, link.clone(), self.txn_path(), umask::USER_EXEC)?
284        } else {
285            self
286        };
287
288        txn.grant(actor, link, TCPathBuf::default(), umask::USER_EXEC)
289    }
290
291    pub fn lock(self, actor: &Actor) -> TCResult<Self> {
292        let mut owner = None;
293        if let Some(token) = &self.token {
294            for (host, actor_id, claim) in token.claims() {
295                if self.is_txn_path(claim.path()) && claim.mode().has(umask::USER_EXEC) {
296                    owner = Some((host, actor_id));
297                }
298            }
299        }
300
301        let link = if let Some((link, public_key)) = owner {
302            assert_eq!(
303                *public_key,
304                Value::Bytes((*actor.public_key().as_bytes()).into())
305            );
306
307            link.clone()
308        } else {
309            panic!("tried to lock a transaction with no owner")
310        };
311
312        self.grant(actor, link, self.txn_path(), umask::USER_WRITE)
313    }
314
315    pub fn has_claims(&self) -> bool {
316        self.token.is_some()
317    }
318
319    pub fn leader(&self, path: &[PathSegment]) -> TCResult<Option<(Link, VerifyingKey)>> {
320        if let Some(token) = &self.token {
321            for (host, actor_id, claim) in token.claims() {
322                if host.path() == path
323                    && claim.path().is_empty()
324                    && claim.mode().has(umask::USER_EXEC)
325                {
326                    let public_key = Arc::<[u8]>::try_from(actor_id.clone())?;
327                    let public_key = VerifyingKey::try_from(&*public_key)
328                        .map_err(|cause| bad_request!("invalid leader key: {cause}"))?;
329
330                    return Ok(Some((host.clone(), public_key)));
331                }
332            }
333        }
334
335        Ok(None)
336    }
337
338    pub fn owner(&self) -> TCResult<Option<VerifyingKey>> {
339        let mut owner = None;
340
341        if let Some(token) = &self.token {
342            for (_host, actor_id, claim) in token.claims() {
343                if self.is_txn_path(claim.path()) && claim.mode().has(umask::USER_EXEC) {
344                    owner = Some(actor_id);
345                }
346            }
347        }
348
349        if let Some(owner) = owner {
350            let public_key = Arc::<[u8]>::try_from(owner.clone())?;
351            let public_key = VerifyingKey::try_from(&*public_key)
352                .map_err(|cause| bad_request!("invalid owner key: {cause}"))?;
353
354            Ok(Some(public_key))
355        } else {
356            Ok(None)
357        }
358    }
359
360    pub(crate) fn with_egress(self, egress: Arc<dyn Egress>) -> Self {
361        Self {
362            id: self.id,
363            expires: self.expires,
364            workspace: self.workspace,
365            client: self.client.with_egress(egress),
366            token: self.token,
367        }
368    }
369
370    #[inline]
371    fn txn_path(&self) -> TCPathBuf {
372        [PREFIX.into(), self.id.to_id()].into()
373    }
374
375    #[inline]
376    fn is_txn_path(&self, path: &[PathSegment]) -> bool {
377        path.len() == 2 && PREFIX == path[0] && self.id == path[1]
378    }
379}
380
381#[async_trait]
382impl Transaction<CacheBlock> for Txn {
383    #[inline]
384    fn id(&self) -> &TxnId {
385        &self.id
386    }
387
388    async fn context(&self) -> TCResult<DirLock<CacheBlock>> {
389        self.workspace.get_or_create(&self.id).await
390    }
391
392    fn subcontext<I: Into<Id> + Send>(&self, id: I) -> Self {
393        Txn {
394            id: self.id,
395            expires: self.expires,
396            workspace: self.workspace.clone().create_dir(id.into()),
397            client: self.client.clone(),
398            token: self.token.clone(),
399        }
400    }
401
402    fn subcontext_unique(&self) -> Self {
403        Txn {
404            id: self.id,
405            expires: self.expires,
406            workspace: self.workspace.clone().create_dir_unique(),
407            client: self.client.clone(),
408            token: self.token.clone(),
409        }
410    }
411}
412
413#[async_trait]
414impl Gateway<State> for Txn {
415    async fn get<'a, L, V>(&'a self, link: L, key: V) -> TCResult<State>
416    where
417        L: Into<ToUrl<'a>> + Send,
418        V: CastInto<Value> + Send,
419    {
420        self.client.get(self, link.into(), key.cast_into()).await
421    }
422
423    async fn put<'a, L, K, V>(&'a self, link: L, key: K, value: V) -> TCResult<()>
424    where
425        L: Into<ToUrl<'a>> + Send,
426        K: CastInto<Value> + Send,
427        V: CastInto<State> + Send,
428    {
429        self.client
430            .put(self, link.into(), key.cast_into(), value.cast_into())
431            .await
432    }
433
434    async fn post<'a, L, P>(&'a self, link: L, params: P) -> TCResult<State>
435    where
436        L: Into<ToUrl<'a>> + Send,
437        P: CastInto<Map<State>> + Send,
438    {
439        self.client
440            .post(self, link.into(), params.cast_into())
441            .await
442    }
443
444    async fn delete<'a, L, V>(&'a self, link: L, key: V) -> TCResult<()>
445    where
446        L: Into<ToUrl<'a>> + Send,
447        V: CastInto<Value> + Send,
448    {
449        self.client.delete(self, link.into(), key.cast_into()).await
450    }
451}