1use std::collections::{BTreeSet, HashSet, VecDeque};
2use std::fmt;
3use std::sync::Arc;
4use std::time::Duration;
5
6use aes_gcm_siv::aead::rand_core::{OsRng, RngCore};
7use aes_gcm_siv::aead::Aead;
8use aes_gcm_siv::{Aes256GcmSiv, KeyInit};
9use async_trait::async_trait;
10use futures::join;
11use log::{debug, info, trace, warn};
12use rand::prelude::IteratorRandom;
13use rjwt::VerifyingKey;
14use safecast::TryCastInto;
15use umask::Mode;
16
17use tc_error::*;
18use tc_scalar::OpRefType;
19#[cfg(feature = "service")]
20use tc_state::chain::Recover;
21use tc_state::CacheBlock;
22use tc_transact::hash::AsyncHash;
23use tc_transact::public::*;
24use tc_transact::{fs, Gateway, Replicate, Transact, Transaction, TxnId};
25use tc_value::{Host, Link, ToUrl, Value};
26use tcgeneric::{label, Label, Map, NetworkTime, PathSegment, TCPath, TCPathBuf, Tuple};
27
28use crate::client::Egress;
29#[cfg(feature = "service")]
30use crate::cluster::Service;
31use crate::cluster::{Class, Cluster, Dir, DirEntry, IsDir, Library, ReplicateAndJoin};
32use crate::txn::{Hypothetical, Txn, TxnServer};
33use crate::{aes256, cluster, Authorize, SignedToken, State};
34
35pub const CLASS: Label = label("class");
36pub const LIB: Label = label("lib");
37pub const SERVICE: Label = label("service");
38const REPLICATION_TTL: Duration = Duration::from_secs(30);
39const STATE_MODE: Mode = Mode::new()
40 .with_class_perm(umask::OTHERS, umask::READ)
41 .with_class_perm(umask::OTHERS, umask::EXEC);
42
43#[cfg(not(feature = "service"))]
44const ERR_NOT_ENABLED: &str = "this binary was compiled without the 'service' feature";
45
46type Nonce = [u8; 12];
47
48struct KernelEgress;
49
50impl Egress for KernelEgress {
51 fn is_authorized(&self, _link: &ToUrl<'_>, _write: bool) -> bool {
52 true
54 }
55}
56
57impl fmt::Debug for KernelEgress {
58 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
59 f.write_str("kernel egress policy")
60 }
61}
62
63pub struct Endpoint<'a> {
64 mode: Mode,
65 txn: &'a Txn,
66 path: &'a [PathSegment],
67 handler: Box<dyn Handler<'a, State> + 'a>,
68}
69
70impl<'a> Endpoint<'a> {
71 pub fn umask(&self) -> Mode {
72 self.mode
73 }
74
75 pub fn get(self, key: Value) -> TCResult<GetFuture<'a, State>> {
76 let get = self
77 .handler
78 .get()
79 .ok_or_else(|| TCError::method_not_allowed(OpRefType::Get, TCPath::from(self.path)))?;
80
81 if self.mode.may_read() {
82 Ok((get)(self.txn, key))
83 } else {
84 Err(unauthorized!("read {}", TCPath::from(self.path)))
85 }
86 }
87
88 pub fn put(self, key: Value, value: State) -> TCResult<PutFuture<'a>> {
89 let put = self
90 .handler
91 .put()
92 .ok_or_else(|| TCError::method_not_allowed(OpRefType::Put, TCPath::from(self.path)))?;
93
94 if self.mode.may_write() {
95 Ok((put)(self.txn, key, value))
96 } else {
97 Err(unauthorized!("read {}", TCPath::from(self.path)))
98 }
99 }
100
101 pub fn post(self, params: Map<State>) -> TCResult<PostFuture<'a, State>> {
102 let post = self
103 .handler
104 .post()
105 .ok_or_else(|| TCError::method_not_allowed(OpRefType::Post, TCPath::from(self.path)))?;
106
107 if self.mode.may_execute() {
108 Ok((post)(self.txn, params))
109 } else {
110 Err(unauthorized!("execute {}", TCPath::from(self.path)))
111 }
112 }
113
114 pub fn delete(self, key: Value) -> TCResult<DeleteFuture<'a>> {
115 let delete = self.handler.delete().ok_or_else(|| {
116 TCError::method_not_allowed(OpRefType::Delete, TCPath::from(self.path))
117 })?;
118
119 if self.mode.may_write() {
120 Ok((delete)(self.txn, key))
121 } else {
122 Err(unauthorized!("read {}", TCPath::from(self.path)))
123 }
124 }
125}
126
127pub(crate) struct Kernel {
128 class: Cluster<Dir<Class>>,
129 library: Cluster<Dir<Library>>,
130 #[cfg(feature = "service")]
131 service: Cluster<Dir<Service>>,
132 state: tc_state::public::Static<Txn>,
133 hypothetical: Cluster<Hypothetical>,
134 keys: HashSet<aes256::Key>,
135}
136
137impl Kernel {
138 async fn issue_token(&self, txn_id: TxnId, path: &[PathSegment]) -> TCResult<SignedToken> {
139 if path.is_empty() {
140 Err(bad_request!(
141 "cannot issue a token for {}",
142 TCPath::from(path)
143 ))
144 } else if path[0] == SERVICE {
145 #[cfg(feature = "service")]
146 {
147 issue_token(txn_id, self.service.clone(), &path[1..]).await
148 }
149
150 #[cfg(not(feature = "service"))]
151 {
152 Err(not_implemented!("{ERR_NOT_ENABLED}"))
153 }
154 } else if path[0] == LIB {
155 issue_token(txn_id, self.library.clone(), &path[1..]).await
156 } else if path[0] == CLASS {
157 issue_token(txn_id, self.class.clone(), &path[1..]).await
158 } else if path.len() >= 2 && &path[..2] == &Hypothetical::PATH[..] {
159 Err(bad_request!(
160 "cannot issue a token for {}",
161 TCPath::from(path)
162 ))
163 } else {
164 Err(not_found!(
165 "there is no resource at {} to issue a token",
166 TCPath::from(path)
167 ))
168 }
169 }
170
171 pub async fn public_key(&self, txn_id: TxnId, path: &[PathSegment]) -> TCResult<VerifyingKey> {
172 if path.is_empty() {
173 Err(bad_request!("{} has no public key", TCPath::from(path)))
174 } else if path[0] == SERVICE {
175 #[cfg(feature = "service")]
176 {
177 public_key(txn_id, self.service.clone(), &path[1..]).await
178 }
179
180 #[cfg(not(feature = "service"))]
181 {
182 Err(not_implemented!("{ERR_NOT_ENABLED}"))
183 }
184 } else if path[0] == LIB {
185 public_key(txn_id, self.library.clone(), &path[1..]).await
186 } else if path[0] == CLASS {
187 public_key(txn_id, self.class.clone(), &path[1..]).await
188 } else if path.len() >= 2 && &path[..2] == &Hypothetical::PATH[..] {
189 if path.len() == Hypothetical::PATH.len() {
190 Err(bad_request!("{} has no public key", TCPath::from(path)))
191 } else {
192 Err(not_found!("there is no resource at {}", TCPath::from(path)))
193 }
194 } else {
195 Err(not_found!("there is no resource at {}", TCPath::from(path)))
196 }
197 }
198
199 pub async fn route<'a>(
200 &'a self,
201 path: &'a [PathSegment],
202 txn: &'a Txn,
203 ) -> TCResult<Endpoint<'a>> {
204 if path.is_empty() {
205 Ok(Endpoint {
206 mode: Mode::new().with_class_perm(umask::OTHERS, umask::READ),
207 txn,
208 path,
209 handler: Box::new(KernelHandler::from(self)),
210 })
211 } else if path[0] == State::PREFIX {
212 let path = &path[1..];
213 let handler = self
214 .state
215 .route(path)
216 .ok_or_else(|| TCError::not_found(TCPath::from(path)))?;
217
218 Ok(Endpoint {
219 mode: STATE_MODE,
220 txn,
221 path,
222 handler,
223 })
224 } else if path[0] == SERVICE {
225 #[cfg(feature = "service")]
226 {
227 let (path, dir_entry) = self.service.clone().lookup(*txn.id(), &path[1..]).await?;
228
229 match dir_entry {
230 DirEntry::Dir(cluster) => auth_claim_route(cluster, path, txn).await,
231 DirEntry::Item(cluster) => auth_claim_route(cluster, path, txn).await,
232 }
233 }
234
235 #[cfg(not(feature = "service"))]
236 {
237 Err(not_implemented!("{ERR_NOT_ENABLED}"))
238 }
239 } else if path[0] == LIB {
240 let (path, dir_entry) = self.library.clone().lookup(*txn.id(), &path[1..]).await?;
241
242 match dir_entry {
243 DirEntry::Dir(cluster) => auth_claim_route(cluster, path, txn).await,
244 DirEntry::Item(cluster) => auth_claim_route(cluster, path, txn).await,
245 }
246 } else if path[0] == CLASS {
247 let (path, dir_entry) = self.class.clone().lookup(*txn.id(), &path[1..]).await?;
248
249 match dir_entry {
250 DirEntry::Dir(cluster) => auth_claim_route(cluster, path, txn).await,
251 DirEntry::Item(cluster) => auth_claim_route(cluster, path, txn).await,
252 }
253 } else if path.len() >= 2 && &path[..2] == &Hypothetical::PATH[..] {
254 auth_claim_route(self.hypothetical.clone(), &path[2..], txn).await
255 } else {
256 Err(TCError::not_found(TCPath::from(path)))
257 }
258 }
259 pub async fn replicate_and_join(
260 &self,
261 txn_server: &TxnServer,
262 peers: &BTreeSet<Host>,
263 ) -> Result<(), bool> {
264 debug!("Kernel::replicate_and_join {peers:?}");
265
266 if peers.is_empty() {
267 info!("not joining replica set since no peers were provided");
268 return Ok(());
269 }
270
271 Self::replicate_and_join_dir(&self.keys, &self.class, txn_server, peers).await?;
272 Self::replicate_and_join_dir(&self.keys, &self.library, txn_server, peers).await?;
273 #[cfg(feature = "service")]
274 Self::replicate_and_join_dir(&self.keys, &self.service, txn_server, peers).await?;
275
276 Self::replicate_and_join_items(&self.keys, &self.class, txn_server, peers).await?;
277 Self::replicate_and_join_items(&self.keys, &self.library, txn_server, peers).await?;
278 #[cfg(feature = "service")]
279 Self::replicate_and_join_items(&self.keys, &self.service, txn_server, peers).await?;
280
281 Ok(())
282 }
283
284 async fn replicate_and_join_dir<T>(
285 keys: &HashSet<aes256::Key>,
286 parent: &Cluster<Dir<T>>,
287 txn_server: &TxnServer,
288 peers: &BTreeSet<Host>,
289 ) -> Result<(), bool>
290 where
291 T: Clone + fmt::Debug,
292 Cluster<Dir<T>>: ReplicateAndJoin,
293 {
294 debug!(
295 "Kernel::replicate_and_join_dir {} with peers {:?}",
296 parent.path(),
297 peers
298 );
299
300 let mut progress = false;
301 let mut unvisited = VecDeque::new();
302 unvisited.push_back(parent.clone());
303
304 while let Some(cluster) = unvisited.pop_front() {
305 let mut joined = false;
306
307 for peer in peers.iter().choose_multiple(&mut OsRng, peers.len()) {
308 let egress = Arc::new(KernelEgress);
309
310 let txn = txn_server
311 .create_txn(NetworkTime::now())
312 .expect("txn")
313 .with_egress(egress.clone());
314
315 trace!("fetching replication token...");
316
317 let txn = match get_and_verify_token_from_peer(
318 &txn_server,
319 &txn,
320 peer,
321 keys,
322 cluster.path(),
323 )
324 .await
325 {
326 Ok(txn) => txn.with_egress(egress.clone()),
327 Err(cause) => {
328 warn!("failed to fetch and verify token from {peer}: {cause}");
329 continue;
330 }
331 };
332
333 trace!("replicating {cluster:?}...");
334
335 let txn_id = *txn.id();
336 match cluster.replicate_and_join(txn, peer.clone()).await {
337 Ok(()) => {
338 joined = true;
339 progress = true;
340
341 let entries = cluster.entries(txn_id).await.expect("dir entry list");
342
343 for (_name, entry) in entries {
344 match &*entry {
345 DirEntry::Dir(dir) => unvisited.push_back(dir.clone()),
346 DirEntry::Item(_) => {}
347 }
348 }
349 }
350 Err(cause) => warn!("failed to replicate from {peer}: {cause}"),
351 }
352 }
353
354 if !joined {
355 return Err(progress);
356 }
357 }
358
359 Ok(())
360 }
361
362 async fn replicate_and_join_items<T>(
363 keys: &HashSet<aes256::Key>,
364 parent: &Cluster<Dir<T>>,
365 txn_server: &TxnServer,
366 peers: &BTreeSet<Host>,
367 ) -> Result<(), bool>
368 where
369 T: Replicate<Txn> + Transact + Clone + fmt::Debug,
370 {
371 info!(
372 "Kernel::replicate_and_join_dir {} with peers {:?}",
373 parent.path(),
374 peers
375 );
376
377 let mut progress = false;
378 let mut unvisited = VecDeque::new();
379 unvisited.push_back(DirEntry::Dir(parent.clone()));
380
381 while let Some(cluster) = unvisited.pop_front() {
382 let txn = txn_server.create_txn(NetworkTime::now()).expect("txn");
383 let txn_id = *txn.id();
384
385 let item = match cluster {
386 DirEntry::Dir(dir) => {
387 let entries = dir.entries(txn_id).await.expect("dir entry list");
388 let entries = entries
389 .into_iter()
390 .map(|(_name, entry)| DirEntry::clone(&*entry));
391
392 unvisited.extend(entries);
393
394 continue;
395 }
396 DirEntry::Item(item) => item,
397 };
398
399 trace!("replicating {item:?}...");
400
401 let mut joined = false;
402 for peer in peers.iter().choose_multiple(&mut OsRng, peers.len()) {
403 let egress = Arc::new(KernelEgress);
404 let txn = txn.clone().with_egress(egress.clone());
405
406 trace!("fetching replication token...");
407
408 let txn = match get_and_verify_token_from_peer(
409 &txn_server,
410 &txn,
411 peer,
412 keys,
413 item.path(),
414 )
415 .await
416 {
417 Ok(txn) => txn.with_egress(egress.clone()),
418 Err(cause) => {
419 warn!("failed to fetch and verify token from {peer}: {cause}");
420 continue;
421 }
422 };
423
424 info!("replicating {item:?} from {peer}...");
425
426 match item.replicate_and_join(txn, peer.clone()).await {
427 Ok(()) => {
428 joined = true;
429 progress = true;
430 }
431 Err(cause) => warn!("failed to replicate from {peer}: {cause}"),
432 }
433 }
434
435 if !joined {
436 return Err(progress);
437 }
438 }
439
440 Ok(())
441 }
442}
443
444impl Kernel {
445 pub async fn commit(&self, txn_id: TxnId) {
446 debug!("Kernel::commit");
447
448 join!(
449 self.class.commit(txn_id),
450 self.library.commit(txn_id),
451 self.hypothetical.rollback(&txn_id)
452 );
453 }
454
455 pub async fn finalize(&self, txn_id: &TxnId) {
456 trace!("Kernel::finalize");
457
458 join!(
459 self.class.finalize(txn_id),
460 self.library.finalize(txn_id),
461 self.hypothetical.finalize(txn_id)
462 );
463 }
464}
465
466#[derive(Clone, Eq, PartialEq)]
467pub struct Schema {
468 lead: Host,
469 host: Host,
470 owner: Option<Link>,
471 group: Option<Link>,
472 keys: HashSet<aes256::Key>,
473}
474
475impl Schema {
476 pub fn new(
477 lead: Host,
478 host: Host,
479 owner: Option<Link>,
480 group: Option<Link>,
481 keys: HashSet<aes256::Key>,
482 ) -> Self {
483 Self {
484 lead,
485 host,
486 owner,
487 group,
488 keys,
489 }
490 }
491
492 fn to_cluster(&self, prefix: Label) -> cluster::Schema {
493 cluster::Schema::new(
494 self.lead.clone(),
495 Link::new(self.host.clone(), prefix.into()),
496 self.owner.clone(),
497 self.group.clone(),
498 )
499 }
500}
501
502#[async_trait]
503impl fs::Persist<CacheBlock> for Kernel {
504 type Txn = Txn;
505 type Schema = Schema;
506
507 async fn create(
508 txn_id: TxnId,
509 schema: Self::Schema,
510 store: fs::Dir<CacheBlock>,
511 ) -> TCResult<Self> {
512 let lead = schema.lead.clone();
513
514 let class = {
515 let schema = schema.to_cluster(CLASS);
516 let dir: fs::Dir<CacheBlock> = store.create_dir(txn_id, CLASS.into()).await?;
517 fs::Persist::<CacheBlock>::create(txn_id, schema, dir).await?
518 };
519
520 let library = {
521 let schema = schema.to_cluster(LIB);
522 let dir: fs::Dir<CacheBlock> = store.create_dir(txn_id, LIB.into()).await?;
523 fs::Persist::<CacheBlock>::create(txn_id, schema, dir).await?
524 };
525
526 #[cfg(feature = "service")]
527 let service = {
528 let schema = schema.to_cluster(SERVICE);
529 let dir: fs::Dir<CacheBlock> = store.create_dir(txn_id, SERVICE.into()).await?;
530 fs::Persist::<CacheBlock>::create(txn_id, schema, dir).await?
531 };
532
533 let link = Link::new(schema.host, Hypothetical::PATH.into());
534 let txn_schema = cluster::Schema::new(lead, link, schema.owner, schema.group);
535 let hypothetical = Cluster::new(txn_schema, Hypothetical::new());
536
537 Ok(Self {
538 class,
539 library,
540 hypothetical,
541 #[cfg(feature = "service")]
542 service,
543 state: tc_state::public::Static::default(),
544 keys: schema.keys,
545 })
546 }
547
548 async fn load(
549 txn_id: TxnId,
550 schema: Self::Schema,
551 store: fs::Dir<CacheBlock>,
552 ) -> TCResult<Self> {
553 let lead = schema.lead.clone();
554
555 let class = {
556 let schema = schema.to_cluster(CLASS);
557 let dir = store.get_or_create_dir(txn_id, CLASS.into()).await?;
558 fs::Persist::<CacheBlock>::load(txn_id, schema, dir).await?
559 };
560
561 let library = {
562 let schema = schema.to_cluster(LIB);
563 let dir = store.get_or_create_dir(txn_id, LIB.into()).await?;
564 fs::Persist::<CacheBlock>::load(txn_id, schema, dir).await?
565 };
566
567 #[cfg(feature = "service")]
568 let service = {
569 let schema = schema.to_cluster(SERVICE);
570 let dir = store.get_or_create_dir(txn_id, SERVICE.into()).await?;
571 fs::Persist::<CacheBlock>::load(txn_id, schema, dir).await?
572 };
573
574 let link = Link::new(schema.host, Hypothetical::PATH.into());
575 let txn_schema = cluster::Schema::new(lead, link, schema.owner, schema.group);
576 let hypothetical = Cluster::new(txn_schema, Hypothetical::new());
577
578 Ok(Self {
579 class,
580 library,
581 #[cfg(feature = "service")]
582 service,
583 hypothetical,
584 state: tc_state::public::Static::default(),
585 keys: schema.keys,
586 })
587 }
588
589 fn dir(&self) -> fs::Inner<CacheBlock> {
590 unimplemented!("Kernel::inner")
591 }
592}
593
594#[cfg(feature = "service")]
595#[async_trait]
596impl Recover<CacheBlock> for Kernel {
597 type Txn = Txn;
598
599 async fn recover(&self, txn: &Txn) -> TCResult<()> {
600 self.service.recover(txn).await
601 }
602}
603
604async fn auth_claim_route<'a, T>(
605 cluster: Cluster<T>,
606 path: &'a [PathSegment],
607 txn: &'a Txn,
608) -> TCResult<Endpoint<'a>>
609where
610 T: AsyncHash + Route<State> + IsDir + Transact + Send + Sync + fmt::Debug + 'a,
611{
612 let txn_id = *txn.id();
613 let mode = {
614 let resource_mode = cluster.umask(txn_id, path);
615 let request_mode = if txn.has_claims() {
616 let keyring = cluster.keyring(txn_id).await?;
617 txn.mode(keyring, path)
618 } else {
619 Txn::DEFAULT_MODE
620 };
621
622 resource_mode & request_mode
623 };
624
625 if mode == Mode::new() {
626 return Err(unauthorized!("access to {}", TCPath::from(path)));
627 }
628
629 let handler = cluster
630 .route_owned(&*path)
631 .ok_or_else(|| not_found!("endpoint {}", TCPath::from(path)))?;
632
633 let endpoint = Endpoint {
634 mode,
635 txn,
636 path,
637 handler,
638 };
639
640 Ok(endpoint)
641}
642
643async fn get_and_verify_token_from_peer(
644 txn_server: &TxnServer,
645 txn: &Txn,
646 peer: &Host,
647 keys: &HashSet<aes256::Key>,
648 path: &TCPathBuf,
649) -> TCResult<Txn> {
650 let token = get_token_from_peer(&txn, peer, keys, path).await?;
651
652 txn_server
653 .verify_txn(*txn.id(), NetworkTime::now(), token)
654 .await
655}
656
657async fn get_token_from_peer(
658 txn: &Txn,
659 peer: &Host,
660 keys: &HashSet<aes256::Key>,
661 path: &TCPathBuf,
662) -> TCResult<String> {
663 let mut nonce = [0u8; 12];
664 OsRng.fill_bytes(&mut nonce);
665
666 let link = Link::from(peer.clone());
667
668 for key in keys {
669 let cipher = Aes256GcmSiv::new(key);
670 let path_encrypted = encrypt_path(&cipher, &nonce, path)?;
671
672 let nonce_and_path = (
673 Value::Bytes(nonce.into()),
674 Value::Bytes(path_encrypted.into()),
675 );
676
677 trace!("requesting replication token from {link}...");
678
679 let nonce_and_token = txn.get(link.clone(), nonce_and_path).await?;
680
681 let (nonce, token): (Value, Value) =
682 Tuple::<State>::try_from(nonce_and_token).and_then(|tuple| {
683 tuple.try_cast_into(|t| TCError::unexpected(t, "an encrypted auth token"))
684 })?;
685
686 let nonce: Arc<[u8]> = nonce.try_into()?;
687 let token: Arc<[u8]> = token.try_into()?;
688
689 return decrypt_token(&cipher, &nonce, &token);
690 }
691
692 Err(internal!(
693 "no peers provided an auth token for cluster replication"
694 ))
695}
696
697async fn issue_token<T>(
698 txn_id: TxnId,
699 cluster: Cluster<Dir<T>>,
700 path: &[PathSegment],
701) -> TCResult<SignedToken>
702where
703 T: Clone + Send + Sync + fmt::Debug,
704{
705 match cluster.lookup(txn_id, path).await? {
706 (suffix, entry) if suffix.is_empty() => match entry {
707 DirEntry::Dir(dir) => dir.issue_token(Mode::all(), REPLICATION_TTL),
708 DirEntry::Item(item) => item.issue_token(Mode::all(), REPLICATION_TTL),
709 },
710 (suffix, _) => Err(not_found!("cluster at {}", TCPath::from(suffix))),
711 }
712}
713
714async fn public_key<T>(
715 txn_id: TxnId,
716 cluster: Cluster<Dir<T>>,
717 path: &[PathSegment],
718) -> TCResult<VerifyingKey>
719where
720 T: Clone + Send + Sync + fmt::Debug,
721{
722 match cluster.lookup(txn_id, path).await? {
723 (suffix, entry) if suffix.is_empty() => match entry {
724 DirEntry::Dir(dir) => Ok(dir.public_key()),
725 DirEntry::Item(item) => Ok(item.public_key()),
726 },
727 (suffix, _) => Err(not_found!("cluster at {}", TCPath::from(suffix))),
728 }
729}
730
731struct KernelHandler<'a> {
732 kernel: &'a Kernel,
733}
734
735impl<'a> Handler<'a, State> for KernelHandler<'a> {
736 fn get<'b>(self: Box<Self>) -> Option<GetHandler<'a, 'b, Txn, State>>
737 where
738 'b: 'a,
739 {
740 Some(Box::new(|txn, key| {
741 Box::pin(async move {
742 trace!("GET /?key={key:?}");
743
744 let (nonce, path_encrypted): (Value, Value) =
745 key.try_cast_into(|v| TCError::unexpected(v, "an encrypted path"))?;
746
747 let nonce: Arc<[u8]> =
748 nonce.try_cast_into(|v| TCError::unexpected(v, "nonce bytes"))?;
749
750 let path_encrypted: Arc<[u8]> = path_encrypted
751 .try_cast_into(|v| TCError::unexpected(v, "an encrypted path"))?;
752
753 for key in &self.kernel.keys {
754 let cipher = Aes256GcmSiv::new(key);
755
756 match decrypt_path(&cipher, &nonce, &*path_encrypted) {
757 Ok(path) => {
758 let mut nonce = [0u8; 12];
759 OsRng.fill_bytes(&mut nonce);
760
761 let signed_token = self.kernel.issue_token(*txn.id(), &path).await?;
762 let token = signed_token.into_jwt();
763 let encrypted_token = encrypt_token(&cipher, &nonce, token)?;
764
765 return Ok(State::from(Value::Tuple(
766 vec![nonce.into(), encrypted_token.into()].into(),
767 )));
768 }
769 Err(cause) => {
770 trace!("unable to decrypt the requested cluster path: {cause}")
771 }
772 }
773 }
774
775 Err(bad_request!("unable to decrypt the requested cluster path"))
776 })
777 }))
778 }
779}
780
781impl<'a> From<&'a Kernel> for KernelHandler<'a> {
782 fn from(kernel: &'a Kernel) -> Self {
783 Self { kernel }
784 }
785}
786
787#[inline]
788fn decrypt_path(cipher: &Aes256GcmSiv, nonce: &[u8], path_encrypted: &[u8]) -> TCResult<TCPathBuf> {
789 let nonce = Nonce::try_from(nonce).map_err(|cause| bad_request!("invalid nonce: {cause}"))?;
790
791 match cipher.decrypt(&nonce.into(), path_encrypted) {
792 Ok(path_decrypted) => {
793 let path_decrypted = String::from_utf8(path_decrypted)
794 .map_err(|cause| bad_request!("invalid UTF8: {cause}"))?;
795
796 path_decrypted.parse().map_err(TCError::from)
797 }
798 Err(_cause) => Err(bad_request!("unable to decrypt the requested cluster path")),
799 }
800}
801
802#[inline]
803fn decrypt_token(cipher: &Aes256GcmSiv, nonce: &[u8], token_encrypted: &[u8]) -> TCResult<String> {
804 let nonce = Nonce::try_from(nonce).map_err(|cause| bad_request!("invalid nonce: {cause}"))?;
805
806 match cipher.decrypt(&nonce.into(), token_encrypted) {
807 Ok(token_decrypted) => String::from_utf8(token_decrypted)
808 .map_err(|cause| bad_request!("invalid UTF8: {cause}")),
809
810 Err(_cause) => Err(bad_request!("unable to decrypt the provided auth token")),
811 }
812}
813
814#[inline]
815fn encrypt_path(cipher: &Aes256GcmSiv, nonce: &[u8], path: &TCPathBuf) -> TCResult<Arc<[u8]>> {
816 let path = path.to_string();
817 let nonce = Nonce::try_from(nonce).map_err(|cause| bad_request!("invalid nonce: {cause}"))?;
818
819 cipher
820 .encrypt(&nonce.into(), path.as_bytes())
821 .map(Arc::from)
822 .map_err(|_| internal!("unable to encrypt path"))
823}
824
825#[inline]
826fn encrypt_token(cipher: &Aes256GcmSiv, nonce: &[u8], token: String) -> TCResult<Arc<[u8]>> {
827 let nonce = Nonce::try_from(nonce).map_err(|cause| bad_request!("invalid nonce: {cause}"))?;
828
829 cipher
830 .encrypt(&nonce.into(), token.as_bytes())
831 .map(Arc::from)
832 .map_err(|_| internal!("unable to encrypt token"))
833}