tc_server/
kernel.rs

1use std::collections::{BTreeSet, HashSet, VecDeque};
2use std::fmt;
3use std::sync::Arc;
4use std::time::Duration;
5
6use aes_gcm_siv::aead::rand_core::{OsRng, RngCore};
7use aes_gcm_siv::aead::Aead;
8use aes_gcm_siv::{Aes256GcmSiv, KeyInit};
9use async_trait::async_trait;
10use futures::join;
11use log::{debug, info, trace, warn};
12use rand::prelude::IteratorRandom;
13use rjwt::VerifyingKey;
14use safecast::TryCastInto;
15use umask::Mode;
16
17use tc_error::*;
18use tc_scalar::OpRefType;
19#[cfg(feature = "service")]
20use tc_state::chain::Recover;
21use tc_state::CacheBlock;
22use tc_transact::hash::AsyncHash;
23use tc_transact::public::*;
24use tc_transact::{fs, Gateway, Replicate, Transact, Transaction, TxnId};
25use tc_value::{Host, Link, ToUrl, Value};
26use tcgeneric::{label, Label, Map, NetworkTime, PathSegment, TCPath, TCPathBuf, Tuple};
27
28use crate::client::Egress;
29#[cfg(feature = "service")]
30use crate::cluster::Service;
31use crate::cluster::{Class, Cluster, Dir, DirEntry, IsDir, Library, ReplicateAndJoin};
32use crate::txn::{Hypothetical, Txn, TxnServer};
33use crate::{aes256, cluster, Authorize, SignedToken, State};
34
35pub const CLASS: Label = label("class");
36pub const LIB: Label = label("lib");
37pub const SERVICE: Label = label("service");
38const REPLICATION_TTL: Duration = Duration::from_secs(30);
39const STATE_MODE: Mode = Mode::new()
40    .with_class_perm(umask::OTHERS, umask::READ)
41    .with_class_perm(umask::OTHERS, umask::EXEC);
42
43#[cfg(not(feature = "service"))]
44const ERR_NOT_ENABLED: &str = "this binary was compiled without the 'service' feature";
45
46type Nonce = [u8; 12];
47
48struct KernelEgress;
49
50impl Egress for KernelEgress {
51    fn is_authorized(&self, _link: &ToUrl<'_>, _write: bool) -> bool {
52        // TODO: implement authorization logic
53        true
54    }
55}
56
57impl fmt::Debug for KernelEgress {
58    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
59        f.write_str("kernel egress policy")
60    }
61}
62
63pub struct Endpoint<'a> {
64    mode: Mode,
65    txn: &'a Txn,
66    path: &'a [PathSegment],
67    handler: Box<dyn Handler<'a, State> + 'a>,
68}
69
70impl<'a> Endpoint<'a> {
71    pub fn umask(&self) -> Mode {
72        self.mode
73    }
74
75    pub fn get(self, key: Value) -> TCResult<GetFuture<'a, State>> {
76        let get = self
77            .handler
78            .get()
79            .ok_or_else(|| TCError::method_not_allowed(OpRefType::Get, TCPath::from(self.path)))?;
80
81        if self.mode.may_read() {
82            Ok((get)(self.txn, key))
83        } else {
84            Err(unauthorized!("read {}", TCPath::from(self.path)))
85        }
86    }
87
88    pub fn put(self, key: Value, value: State) -> TCResult<PutFuture<'a>> {
89        let put = self
90            .handler
91            .put()
92            .ok_or_else(|| TCError::method_not_allowed(OpRefType::Put, TCPath::from(self.path)))?;
93
94        if self.mode.may_write() {
95            Ok((put)(self.txn, key, value))
96        } else {
97            Err(unauthorized!("read {}", TCPath::from(self.path)))
98        }
99    }
100
101    pub fn post(self, params: Map<State>) -> TCResult<PostFuture<'a, State>> {
102        let post = self
103            .handler
104            .post()
105            .ok_or_else(|| TCError::method_not_allowed(OpRefType::Post, TCPath::from(self.path)))?;
106
107        if self.mode.may_execute() {
108            Ok((post)(self.txn, params))
109        } else {
110            Err(unauthorized!("execute {}", TCPath::from(self.path)))
111        }
112    }
113
114    pub fn delete(self, key: Value) -> TCResult<DeleteFuture<'a>> {
115        let delete = self.handler.delete().ok_or_else(|| {
116            TCError::method_not_allowed(OpRefType::Delete, TCPath::from(self.path))
117        })?;
118
119        if self.mode.may_write() {
120            Ok((delete)(self.txn, key))
121        } else {
122            Err(unauthorized!("read {}", TCPath::from(self.path)))
123        }
124    }
125}
126
127pub(crate) struct Kernel {
128    class: Cluster<Dir<Class>>,
129    library: Cluster<Dir<Library>>,
130    #[cfg(feature = "service")]
131    service: Cluster<Dir<Service>>,
132    state: tc_state::public::Static<Txn>,
133    hypothetical: Cluster<Hypothetical>,
134    keys: HashSet<aes256::Key>,
135}
136
137impl Kernel {
138    async fn issue_token(&self, txn_id: TxnId, path: &[PathSegment]) -> TCResult<SignedToken> {
139        if path.is_empty() {
140            Err(bad_request!(
141                "cannot issue a token for {}",
142                TCPath::from(path)
143            ))
144        } else if path[0] == SERVICE {
145            #[cfg(feature = "service")]
146            {
147                issue_token(txn_id, self.service.clone(), &path[1..]).await
148            }
149
150            #[cfg(not(feature = "service"))]
151            {
152                Err(not_implemented!("{ERR_NOT_ENABLED}"))
153            }
154        } else if path[0] == LIB {
155            issue_token(txn_id, self.library.clone(), &path[1..]).await
156        } else if path[0] == CLASS {
157            issue_token(txn_id, self.class.clone(), &path[1..]).await
158        } else if path.len() >= 2 && &path[..2] == &Hypothetical::PATH[..] {
159            Err(bad_request!(
160                "cannot issue a token for {}",
161                TCPath::from(path)
162            ))
163        } else {
164            Err(not_found!(
165                "there is no resource at {} to issue a token",
166                TCPath::from(path)
167            ))
168        }
169    }
170
171    pub async fn public_key(&self, txn_id: TxnId, path: &[PathSegment]) -> TCResult<VerifyingKey> {
172        if path.is_empty() {
173            Err(bad_request!("{} has no public key", TCPath::from(path)))
174        } else if path[0] == SERVICE {
175            #[cfg(feature = "service")]
176            {
177                public_key(txn_id, self.service.clone(), &path[1..]).await
178            }
179
180            #[cfg(not(feature = "service"))]
181            {
182                Err(not_implemented!("{ERR_NOT_ENABLED}"))
183            }
184        } else if path[0] == LIB {
185            public_key(txn_id, self.library.clone(), &path[1..]).await
186        } else if path[0] == CLASS {
187            public_key(txn_id, self.class.clone(), &path[1..]).await
188        } else if path.len() >= 2 && &path[..2] == &Hypothetical::PATH[..] {
189            if path.len() == Hypothetical::PATH.len() {
190                Err(bad_request!("{} has no public key", TCPath::from(path)))
191            } else {
192                Err(not_found!("there is no resource at {}", TCPath::from(path)))
193            }
194        } else {
195            Err(not_found!("there is no resource at {}", TCPath::from(path)))
196        }
197    }
198
199    pub async fn route<'a>(
200        &'a self,
201        path: &'a [PathSegment],
202        txn: &'a Txn,
203    ) -> TCResult<Endpoint<'a>> {
204        if path.is_empty() {
205            Ok(Endpoint {
206                mode: Mode::new().with_class_perm(umask::OTHERS, umask::READ),
207                txn,
208                path,
209                handler: Box::new(KernelHandler::from(self)),
210            })
211        } else if path[0] == State::PREFIX {
212            let path = &path[1..];
213            let handler = self
214                .state
215                .route(path)
216                .ok_or_else(|| TCError::not_found(TCPath::from(path)))?;
217
218            Ok(Endpoint {
219                mode: STATE_MODE,
220                txn,
221                path,
222                handler,
223            })
224        } else if path[0] == SERVICE {
225            #[cfg(feature = "service")]
226            {
227                let (path, dir_entry) = self.service.clone().lookup(*txn.id(), &path[1..]).await?;
228
229                match dir_entry {
230                    DirEntry::Dir(cluster) => auth_claim_route(cluster, path, txn).await,
231                    DirEntry::Item(cluster) => auth_claim_route(cluster, path, txn).await,
232                }
233            }
234
235            #[cfg(not(feature = "service"))]
236            {
237                Err(not_implemented!("{ERR_NOT_ENABLED}"))
238            }
239        } else if path[0] == LIB {
240            let (path, dir_entry) = self.library.clone().lookup(*txn.id(), &path[1..]).await?;
241
242            match dir_entry {
243                DirEntry::Dir(cluster) => auth_claim_route(cluster, path, txn).await,
244                DirEntry::Item(cluster) => auth_claim_route(cluster, path, txn).await,
245            }
246        } else if path[0] == CLASS {
247            let (path, dir_entry) = self.class.clone().lookup(*txn.id(), &path[1..]).await?;
248
249            match dir_entry {
250                DirEntry::Dir(cluster) => auth_claim_route(cluster, path, txn).await,
251                DirEntry::Item(cluster) => auth_claim_route(cluster, path, txn).await,
252            }
253        } else if path.len() >= 2 && &path[..2] == &Hypothetical::PATH[..] {
254            auth_claim_route(self.hypothetical.clone(), &path[2..], txn).await
255        } else {
256            Err(TCError::not_found(TCPath::from(path)))
257        }
258    }
259    pub async fn replicate_and_join(
260        &self,
261        txn_server: &TxnServer,
262        peers: &BTreeSet<Host>,
263    ) -> Result<(), bool> {
264        debug!("Kernel::replicate_and_join {peers:?}");
265
266        if peers.is_empty() {
267            info!("not joining replica set since no peers were provided");
268            return Ok(());
269        }
270
271        Self::replicate_and_join_dir(&self.keys, &self.class, txn_server, peers).await?;
272        Self::replicate_and_join_dir(&self.keys, &self.library, txn_server, peers).await?;
273        #[cfg(feature = "service")]
274        Self::replicate_and_join_dir(&self.keys, &self.service, txn_server, peers).await?;
275
276        Self::replicate_and_join_items(&self.keys, &self.class, txn_server, peers).await?;
277        Self::replicate_and_join_items(&self.keys, &self.library, txn_server, peers).await?;
278        #[cfg(feature = "service")]
279        Self::replicate_and_join_items(&self.keys, &self.service, txn_server, peers).await?;
280
281        Ok(())
282    }
283
284    async fn replicate_and_join_dir<T>(
285        keys: &HashSet<aes256::Key>,
286        parent: &Cluster<Dir<T>>,
287        txn_server: &TxnServer,
288        peers: &BTreeSet<Host>,
289    ) -> Result<(), bool>
290    where
291        T: Clone + fmt::Debug,
292        Cluster<Dir<T>>: ReplicateAndJoin,
293    {
294        debug!(
295            "Kernel::replicate_and_join_dir {} with peers {:?}",
296            parent.path(),
297            peers
298        );
299
300        let mut progress = false;
301        let mut unvisited = VecDeque::new();
302        unvisited.push_back(parent.clone());
303
304        while let Some(cluster) = unvisited.pop_front() {
305            let mut joined = false;
306
307            for peer in peers.iter().choose_multiple(&mut OsRng, peers.len()) {
308                let egress = Arc::new(KernelEgress);
309
310                let txn = txn_server
311                    .create_txn(NetworkTime::now())
312                    .expect("txn")
313                    .with_egress(egress.clone());
314
315                trace!("fetching replication token...");
316
317                let txn = match get_and_verify_token_from_peer(
318                    &txn_server,
319                    &txn,
320                    peer,
321                    keys,
322                    cluster.path(),
323                )
324                .await
325                {
326                    Ok(txn) => txn.with_egress(egress.clone()),
327                    Err(cause) => {
328                        warn!("failed to fetch and verify token from {peer}: {cause}");
329                        continue;
330                    }
331                };
332
333                trace!("replicating {cluster:?}...");
334
335                let txn_id = *txn.id();
336                match cluster.replicate_and_join(txn, peer.clone()).await {
337                    Ok(()) => {
338                        joined = true;
339                        progress = true;
340
341                        let entries = cluster.entries(txn_id).await.expect("dir entry list");
342
343                        for (_name, entry) in entries {
344                            match &*entry {
345                                DirEntry::Dir(dir) => unvisited.push_back(dir.clone()),
346                                DirEntry::Item(_) => {}
347                            }
348                        }
349                    }
350                    Err(cause) => warn!("failed to replicate from {peer}: {cause}"),
351                }
352            }
353
354            if !joined {
355                return Err(progress);
356            }
357        }
358
359        Ok(())
360    }
361
362    async fn replicate_and_join_items<T>(
363        keys: &HashSet<aes256::Key>,
364        parent: &Cluster<Dir<T>>,
365        txn_server: &TxnServer,
366        peers: &BTreeSet<Host>,
367    ) -> Result<(), bool>
368    where
369        T: Replicate<Txn> + Transact + Clone + fmt::Debug,
370    {
371        info!(
372            "Kernel::replicate_and_join_dir {} with peers {:?}",
373            parent.path(),
374            peers
375        );
376
377        let mut progress = false;
378        let mut unvisited = VecDeque::new();
379        unvisited.push_back(DirEntry::Dir(parent.clone()));
380
381        while let Some(cluster) = unvisited.pop_front() {
382            let txn = txn_server.create_txn(NetworkTime::now()).expect("txn");
383            let txn_id = *txn.id();
384
385            let item = match cluster {
386                DirEntry::Dir(dir) => {
387                    let entries = dir.entries(txn_id).await.expect("dir entry list");
388                    let entries = entries
389                        .into_iter()
390                        .map(|(_name, entry)| DirEntry::clone(&*entry));
391
392                    unvisited.extend(entries);
393
394                    continue;
395                }
396                DirEntry::Item(item) => item,
397            };
398
399            trace!("replicating {item:?}...");
400
401            let mut joined = false;
402            for peer in peers.iter().choose_multiple(&mut OsRng, peers.len()) {
403                let egress = Arc::new(KernelEgress);
404                let txn = txn.clone().with_egress(egress.clone());
405
406                trace!("fetching replication token...");
407
408                let txn = match get_and_verify_token_from_peer(
409                    &txn_server,
410                    &txn,
411                    peer,
412                    keys,
413                    item.path(),
414                )
415                .await
416                {
417                    Ok(txn) => txn.with_egress(egress.clone()),
418                    Err(cause) => {
419                        warn!("failed to fetch and verify token from {peer}: {cause}");
420                        continue;
421                    }
422                };
423
424                info!("replicating {item:?} from {peer}...");
425
426                match item.replicate_and_join(txn, peer.clone()).await {
427                    Ok(()) => {
428                        joined = true;
429                        progress = true;
430                    }
431                    Err(cause) => warn!("failed to replicate from {peer}: {cause}"),
432                }
433            }
434
435            if !joined {
436                return Err(progress);
437            }
438        }
439
440        Ok(())
441    }
442}
443
444impl Kernel {
445    pub async fn commit(&self, txn_id: TxnId) {
446        debug!("Kernel::commit");
447
448        join!(
449            self.class.commit(txn_id),
450            self.library.commit(txn_id),
451            self.hypothetical.rollback(&txn_id)
452        );
453    }
454
455    pub async fn finalize(&self, txn_id: &TxnId) {
456        trace!("Kernel::finalize");
457
458        join!(
459            self.class.finalize(txn_id),
460            self.library.finalize(txn_id),
461            self.hypothetical.finalize(txn_id)
462        );
463    }
464}
465
466#[derive(Clone, Eq, PartialEq)]
467pub struct Schema {
468    lead: Host,
469    host: Host,
470    owner: Option<Link>,
471    group: Option<Link>,
472    keys: HashSet<aes256::Key>,
473}
474
475impl Schema {
476    pub fn new(
477        lead: Host,
478        host: Host,
479        owner: Option<Link>,
480        group: Option<Link>,
481        keys: HashSet<aes256::Key>,
482    ) -> Self {
483        Self {
484            lead,
485            host,
486            owner,
487            group,
488            keys,
489        }
490    }
491
492    fn to_cluster(&self, prefix: Label) -> cluster::Schema {
493        cluster::Schema::new(
494            self.lead.clone(),
495            Link::new(self.host.clone(), prefix.into()),
496            self.owner.clone(),
497            self.group.clone(),
498        )
499    }
500}
501
502#[async_trait]
503impl fs::Persist<CacheBlock> for Kernel {
504    type Txn = Txn;
505    type Schema = Schema;
506
507    async fn create(
508        txn_id: TxnId,
509        schema: Self::Schema,
510        store: fs::Dir<CacheBlock>,
511    ) -> TCResult<Self> {
512        let lead = schema.lead.clone();
513
514        let class = {
515            let schema = schema.to_cluster(CLASS);
516            let dir: fs::Dir<CacheBlock> = store.create_dir(txn_id, CLASS.into()).await?;
517            fs::Persist::<CacheBlock>::create(txn_id, schema, dir).await?
518        };
519
520        let library = {
521            let schema = schema.to_cluster(LIB);
522            let dir: fs::Dir<CacheBlock> = store.create_dir(txn_id, LIB.into()).await?;
523            fs::Persist::<CacheBlock>::create(txn_id, schema, dir).await?
524        };
525
526        #[cfg(feature = "service")]
527        let service = {
528            let schema = schema.to_cluster(SERVICE);
529            let dir: fs::Dir<CacheBlock> = store.create_dir(txn_id, SERVICE.into()).await?;
530            fs::Persist::<CacheBlock>::create(txn_id, schema, dir).await?
531        };
532
533        let link = Link::new(schema.host, Hypothetical::PATH.into());
534        let txn_schema = cluster::Schema::new(lead, link, schema.owner, schema.group);
535        let hypothetical = Cluster::new(txn_schema, Hypothetical::new());
536
537        Ok(Self {
538            class,
539            library,
540            hypothetical,
541            #[cfg(feature = "service")]
542            service,
543            state: tc_state::public::Static::default(),
544            keys: schema.keys,
545        })
546    }
547
548    async fn load(
549        txn_id: TxnId,
550        schema: Self::Schema,
551        store: fs::Dir<CacheBlock>,
552    ) -> TCResult<Self> {
553        let lead = schema.lead.clone();
554
555        let class = {
556            let schema = schema.to_cluster(CLASS);
557            let dir = store.get_or_create_dir(txn_id, CLASS.into()).await?;
558            fs::Persist::<CacheBlock>::load(txn_id, schema, dir).await?
559        };
560
561        let library = {
562            let schema = schema.to_cluster(LIB);
563            let dir = store.get_or_create_dir(txn_id, LIB.into()).await?;
564            fs::Persist::<CacheBlock>::load(txn_id, schema, dir).await?
565        };
566
567        #[cfg(feature = "service")]
568        let service = {
569            let schema = schema.to_cluster(SERVICE);
570            let dir = store.get_or_create_dir(txn_id, SERVICE.into()).await?;
571            fs::Persist::<CacheBlock>::load(txn_id, schema, dir).await?
572        };
573
574        let link = Link::new(schema.host, Hypothetical::PATH.into());
575        let txn_schema = cluster::Schema::new(lead, link, schema.owner, schema.group);
576        let hypothetical = Cluster::new(txn_schema, Hypothetical::new());
577
578        Ok(Self {
579            class,
580            library,
581            #[cfg(feature = "service")]
582            service,
583            hypothetical,
584            state: tc_state::public::Static::default(),
585            keys: schema.keys,
586        })
587    }
588
589    fn dir(&self) -> fs::Inner<CacheBlock> {
590        unimplemented!("Kernel::inner")
591    }
592}
593
594#[cfg(feature = "service")]
595#[async_trait]
596impl Recover<CacheBlock> for Kernel {
597    type Txn = Txn;
598
599    async fn recover(&self, txn: &Txn) -> TCResult<()> {
600        self.service.recover(txn).await
601    }
602}
603
604async fn auth_claim_route<'a, T>(
605    cluster: Cluster<T>,
606    path: &'a [PathSegment],
607    txn: &'a Txn,
608) -> TCResult<Endpoint<'a>>
609where
610    T: AsyncHash + Route<State> + IsDir + Transact + Send + Sync + fmt::Debug + 'a,
611{
612    let txn_id = *txn.id();
613    let mode = {
614        let resource_mode = cluster.umask(txn_id, path);
615        let request_mode = if txn.has_claims() {
616            let keyring = cluster.keyring(txn_id).await?;
617            txn.mode(keyring, path)
618        } else {
619            Txn::DEFAULT_MODE
620        };
621
622        resource_mode & request_mode
623    };
624
625    if mode == Mode::new() {
626        return Err(unauthorized!("access to {}", TCPath::from(path)));
627    }
628
629    let handler = cluster
630        .route_owned(&*path)
631        .ok_or_else(|| not_found!("endpoint {}", TCPath::from(path)))?;
632
633    let endpoint = Endpoint {
634        mode,
635        txn,
636        path,
637        handler,
638    };
639
640    Ok(endpoint)
641}
642
643async fn get_and_verify_token_from_peer(
644    txn_server: &TxnServer,
645    txn: &Txn,
646    peer: &Host,
647    keys: &HashSet<aes256::Key>,
648    path: &TCPathBuf,
649) -> TCResult<Txn> {
650    let token = get_token_from_peer(&txn, peer, keys, path).await?;
651
652    txn_server
653        .verify_txn(*txn.id(), NetworkTime::now(), token)
654        .await
655}
656
657async fn get_token_from_peer(
658    txn: &Txn,
659    peer: &Host,
660    keys: &HashSet<aes256::Key>,
661    path: &TCPathBuf,
662) -> TCResult<String> {
663    let mut nonce = [0u8; 12];
664    OsRng.fill_bytes(&mut nonce);
665
666    let link = Link::from(peer.clone());
667
668    for key in keys {
669        let cipher = Aes256GcmSiv::new(key);
670        let path_encrypted = encrypt_path(&cipher, &nonce, path)?;
671
672        let nonce_and_path = (
673            Value::Bytes(nonce.into()),
674            Value::Bytes(path_encrypted.into()),
675        );
676
677        trace!("requesting replication token from {link}...");
678
679        let nonce_and_token = txn.get(link.clone(), nonce_and_path).await?;
680
681        let (nonce, token): (Value, Value) =
682            Tuple::<State>::try_from(nonce_and_token).and_then(|tuple| {
683                tuple.try_cast_into(|t| TCError::unexpected(t, "an encrypted auth token"))
684            })?;
685
686        let nonce: Arc<[u8]> = nonce.try_into()?;
687        let token: Arc<[u8]> = token.try_into()?;
688
689        return decrypt_token(&cipher, &nonce, &token);
690    }
691
692    Err(internal!(
693        "no peers provided an auth token for cluster replication"
694    ))
695}
696
697async fn issue_token<T>(
698    txn_id: TxnId,
699    cluster: Cluster<Dir<T>>,
700    path: &[PathSegment],
701) -> TCResult<SignedToken>
702where
703    T: Clone + Send + Sync + fmt::Debug,
704{
705    match cluster.lookup(txn_id, path).await? {
706        (suffix, entry) if suffix.is_empty() => match entry {
707            DirEntry::Dir(dir) => dir.issue_token(Mode::all(), REPLICATION_TTL),
708            DirEntry::Item(item) => item.issue_token(Mode::all(), REPLICATION_TTL),
709        },
710        (suffix, _) => Err(not_found!("cluster at {}", TCPath::from(suffix))),
711    }
712}
713
714async fn public_key<T>(
715    txn_id: TxnId,
716    cluster: Cluster<Dir<T>>,
717    path: &[PathSegment],
718) -> TCResult<VerifyingKey>
719where
720    T: Clone + Send + Sync + fmt::Debug,
721{
722    match cluster.lookup(txn_id, path).await? {
723        (suffix, entry) if suffix.is_empty() => match entry {
724            DirEntry::Dir(dir) => Ok(dir.public_key()),
725            DirEntry::Item(item) => Ok(item.public_key()),
726        },
727        (suffix, _) => Err(not_found!("cluster at {}", TCPath::from(suffix))),
728    }
729}
730
731struct KernelHandler<'a> {
732    kernel: &'a Kernel,
733}
734
735impl<'a> Handler<'a, State> for KernelHandler<'a> {
736    fn get<'b>(self: Box<Self>) -> Option<GetHandler<'a, 'b, Txn, State>>
737    where
738        'b: 'a,
739    {
740        Some(Box::new(|txn, key| {
741            Box::pin(async move {
742                trace!("GET /?key={key:?}");
743
744                let (nonce, path_encrypted): (Value, Value) =
745                    key.try_cast_into(|v| TCError::unexpected(v, "an encrypted path"))?;
746
747                let nonce: Arc<[u8]> =
748                    nonce.try_cast_into(|v| TCError::unexpected(v, "nonce bytes"))?;
749
750                let path_encrypted: Arc<[u8]> = path_encrypted
751                    .try_cast_into(|v| TCError::unexpected(v, "an encrypted path"))?;
752
753                for key in &self.kernel.keys {
754                    let cipher = Aes256GcmSiv::new(key);
755
756                    match decrypt_path(&cipher, &nonce, &*path_encrypted) {
757                        Ok(path) => {
758                            let mut nonce = [0u8; 12];
759                            OsRng.fill_bytes(&mut nonce);
760
761                            let signed_token = self.kernel.issue_token(*txn.id(), &path).await?;
762                            let token = signed_token.into_jwt();
763                            let encrypted_token = encrypt_token(&cipher, &nonce, token)?;
764
765                            return Ok(State::from(Value::Tuple(
766                                vec![nonce.into(), encrypted_token.into()].into(),
767                            )));
768                        }
769                        Err(cause) => {
770                            trace!("unable to decrypt the requested cluster path: {cause}")
771                        }
772                    }
773                }
774
775                Err(bad_request!("unable to decrypt the requested cluster path"))
776            })
777        }))
778    }
779}
780
781impl<'a> From<&'a Kernel> for KernelHandler<'a> {
782    fn from(kernel: &'a Kernel) -> Self {
783        Self { kernel }
784    }
785}
786
787#[inline]
788fn decrypt_path(cipher: &Aes256GcmSiv, nonce: &[u8], path_encrypted: &[u8]) -> TCResult<TCPathBuf> {
789    let nonce = Nonce::try_from(nonce).map_err(|cause| bad_request!("invalid nonce: {cause}"))?;
790
791    match cipher.decrypt(&nonce.into(), path_encrypted) {
792        Ok(path_decrypted) => {
793            let path_decrypted = String::from_utf8(path_decrypted)
794                .map_err(|cause| bad_request!("invalid UTF8: {cause}"))?;
795
796            path_decrypted.parse().map_err(TCError::from)
797        }
798        Err(_cause) => Err(bad_request!("unable to decrypt the requested cluster path")),
799    }
800}
801
802#[inline]
803fn decrypt_token(cipher: &Aes256GcmSiv, nonce: &[u8], token_encrypted: &[u8]) -> TCResult<String> {
804    let nonce = Nonce::try_from(nonce).map_err(|cause| bad_request!("invalid nonce: {cause}"))?;
805
806    match cipher.decrypt(&nonce.into(), token_encrypted) {
807        Ok(token_decrypted) => String::from_utf8(token_decrypted)
808            .map_err(|cause| bad_request!("invalid UTF8: {cause}")),
809
810        Err(_cause) => Err(bad_request!("unable to decrypt the provided auth token")),
811    }
812}
813
814#[inline]
815fn encrypt_path(cipher: &Aes256GcmSiv, nonce: &[u8], path: &TCPathBuf) -> TCResult<Arc<[u8]>> {
816    let path = path.to_string();
817    let nonce = Nonce::try_from(nonce).map_err(|cause| bad_request!("invalid nonce: {cause}"))?;
818
819    cipher
820        .encrypt(&nonce.into(), path.as_bytes())
821        .map(Arc::from)
822        .map_err(|_| internal!("unable to encrypt path"))
823}
824
825#[inline]
826fn encrypt_token(cipher: &Aes256GcmSiv, nonce: &[u8], token: String) -> TCResult<Arc<[u8]>> {
827    let nonce = Nonce::try_from(nonce).map_err(|cause| bad_request!("invalid nonce: {cause}"))?;
828
829    cipher
830        .encrypt(&nonce.into(), token.as_bytes())
831        .map(Arc::from)
832        .map_err(|_| internal!("unable to encrypt token"))
833}