1use std::fmt;
2use std::sync::Arc;
3
4use futures::future::{Future, TryFutureExt};
5use futures::stream::{FuturesUnordered, TryStreamExt};
6use log::debug;
7use rjwt::VerifyingKey;
8use safecast::{TryCastFrom, TryCastInto};
9
10use tc_error::*;
11use tc_transact::hash::AsyncHash;
12use tc_transact::public::*;
13use tc_transact::{Gateway, Transact, Transaction};
14use tc_value::{Host, Link, Value};
15use tcgeneric::{label, Id, Label, Map, PathSegment, TCPath, Tuple};
16
17use crate::txn::Txn;
18use crate::State;
19
20use super::{Cluster, IsDir, REPLICAS};
21
22const ACTION: Label = label("action");
23const JOIN: Label = label("join");
24
25mod class;
26mod dir;
27mod library;
28#[cfg(feature = "service")]
29mod service;
30
31struct ClusterHandler<T> {
32 cluster: Cluster<T>,
33}
34
35impl<'a, T> Handler<'a, State> for ClusterHandler<T>
36where
37 T: AsyncHash + Public<State> + IsDir + Transact + Send + Sync + fmt::Debug + 'a,
38{
39 fn get<'b>(self: Box<Self>) -> Option<GetHandler<'a, 'b, Txn, State>>
40 where
41 'b: 'a,
42 {
43 Some(Box::new(|txn, key: Value| {
44 Box::pin(async move {
45 if txn.has_claims() {
46 self.cluster.state().get(txn, &[], key).await
47 } else {
48 let keyring = self.cluster.keyring(*txn.id()).await?;
49
50 if key.is_none() {
51 let keyring = keyring
52 .values()
53 .map(|public_key| Value::Bytes((*public_key.as_bytes()).into()))
54 .map(State::from)
55 .collect();
56
57 Ok(State::Tuple(keyring))
58 } else {
59 let key = Arc::<[u8]>::try_from(key)?;
60
61 if keyring
62 .values()
63 .any(|public_key| public_key.as_bytes() == &key[..])
64 {
65 Ok(State::from(Value::from(key)))
66 } else {
67 Err(not_found!(
68 "{:?} (of {} keys)",
69 Value::Bytes(key),
70 keyring.len()
71 ))
72 }
73 }
74 }
75 })
76 }))
77 }
78
79 fn put<'b>(self: Box<Self>) -> Option<PutHandler<'a, 'b, Txn, State>>
80 where
81 'b: 'a,
82 {
83 Some(Box::new(|txn, key, value| {
84 Box::pin(async move {
85 if txn.locked_by()?.is_some() {
86 debug!("received commit message for {:?}", self.cluster);
87
88 return if key.is_some() || value.is_some() {
89 Err(TCError::unexpected((key, value), "empty commit message"))
90 } else if txn.leader(self.cluster.path())?.is_none() {
91 let txn = self.cluster.claim(txn.clone())?;
92 self.cluster.replicate_commit(&txn).await
93 } else {
94 self.cluster.replicate_commit(txn).await
95 };
96 }
97
98 let should_commit = txn.leader(self.cluster.path())?.is_none();
99 let txn = self.cluster.claim(txn.clone())?;
100
101 self.cluster
102 .state
103 .put(&txn, &[], key.clone(), value.clone())
104 .await?;
105
106 debug!("write to {:?} succeeded", self.cluster);
107
108 let (_leader, leader_pk) = txn
109 .leader(self.cluster.path())?
110 .ok_or_else(|| internal!("leaderless transaction"))?;
111
112 if leader_pk == self.cluster.public_key() {
113 self.cluster
114 .replicate_write(&txn, &[], |txn, link| {
115 debug!("replicating write to {:?} to {}...", self.cluster, link);
116
117 let key = key.clone();
118 let value = value.clone();
119 async move { txn.put(link, key, value).await }
120 })
121 .await?;
122 }
123
124 if self.cluster.is_dir() {
125 let this_host = if let Some(host) = self.cluster.link().host() {
126 host
127 } else {
128 return Ok(());
129 };
130
131 let entry_name: PathSegment =
132 key.try_cast_into(|v| TCError::unexpected(v, "a directory entry name"))?;
133
134 let (this_replica_hash, this_replica_pk) = self
135 .cluster
136 .get_dir_item_key(*txn.id(), &entry_name)
137 .await?
138 .ok_or_else(|| {
139 internal!("there is no directory entry {entry_name} to replicate")
140 })?;
141
142 let (leader, leader_pk) = txn.leader(self.cluster.path())?.expect("leader");
143
144 if leader_pk == self.cluster.public_key() {
145 let replica_set: Vec<Host> = if let Some(keyring) = self
146 .cluster
147 .get_dir_item_keyring(*txn.id(), &entry_name)
148 .await?
149 {
150 keyring.keys().cloned().collect()
151 } else {
152 vec![]
153 };
154
155 let this_replica_path = self.cluster.path().clone().append(entry_name);
156
157 replica_set
158 .iter()
159 .map(|host| {
160 let replicas = replica_set
161 .iter()
162 .filter(|that_host| host != *that_host)
163 .cloned()
164 .map(Value::from)
165 .collect();
166
167 let params: Map<State> = [
168 (Id::from(ACTION), Value::String(JOIN.into())),
169 (Id::from(REPLICAS), Value::Tuple(replicas)),
170 ]
171 .into_iter()
172 .collect();
173
174 txn.post(
175 Link::from((
176 host.clone(),
177 this_replica_path.clone().append(REPLICAS),
178 )),
179 params,
180 )
181 })
182 .collect::<FuturesUnordered<_>>()
183 .try_fold(State::default(), |_, _| {
184 futures::future::ready(Ok(State::default()))
185 })
186 .await?;
187 } else {
188 let this_replica_hash = Value::Bytes(this_replica_hash.as_slice().into());
189 let this_replica_pk = Value::Bytes(Arc::new(*this_replica_pk.as_bytes()));
190
191 txn.put(
192 leader.append(entry_name).append(REPLICAS),
193 (this_host.clone(), this_replica_pk),
194 this_replica_hash,
195 )
196 .await?;
197 }
198 }
199
200 let owner = txn
201 .owner()?
202 .ok_or_else(|| internal!("ownerless transaction"))?;
203
204 if should_commit {
205 if owner == self.cluster.public_key() {
206 let txn = self.cluster.lock(txn)?;
207 self.cluster.replicate_commit(&txn).await?;
208 }
209 }
210
211 Ok(())
212 })
213 }))
214 }
215
216 fn post<'b>(self: Box<Self>) -> Option<PostHandler<'a, 'b, Txn, State>>
217 where
218 'b: 'a,
219 {
220 Some(Box::new(|txn, params| {
221 Box::pin(async move {
222 let should_rollback = txn.leader(self.cluster.path())?.is_none();
225 let txn = self.cluster.claim(txn.clone())?;
226 let response = self.cluster.state.post(&txn, &[], params).await?;
227
228 if should_rollback {
229 if let Some(owner) = txn.owner()? {
230 if owner == self.cluster.public_key() {
231 let txn = self.cluster.lock(txn.clone())?;
232 self.cluster.replicate_rollback(&txn).await?;
233 }
234 }
235 }
236
237 Ok(response)
238 })
239 }))
240 }
241
242 fn delete<'b>(self: Box<Self>) -> Option<DeleteHandler<'a, 'b, Txn>>
243 where
244 'b: 'a,
245 {
246 Some(Box::new(|txn, key| {
247 Box::pin(async move {
248 if txn.locked_by()?.is_some() {
249 return if key.is_some() {
250 Err(TCError::unexpected(key, "empty rollback message"))
251 } else {
252 self.cluster.replicate_rollback(txn).await
253 };
254 }
255
256 let should_commit = txn.leader(self.cluster.path())?.is_none();
257 let txn = self.cluster.claim(txn.clone())?;
258 self.cluster.state.delete(&txn, &[], key.clone()).await?;
259
260 maybe_replicate(
261 &self.cluster,
262 &txn,
263 &[],
264 |txn, link| {
265 let key = key.clone();
266 async move { txn.delete(link, key).await }
267 },
268 should_commit,
269 )
270 .await
271 })
272 }))
273 }
274}
275
276impl<T> From<Cluster<T>> for ClusterHandler<T> {
277 fn from(cluster: Cluster<T>) -> Self {
278 Self { cluster }
279 }
280}
281
282struct ReplicaSetHandler<T> {
283 cluster: Cluster<T>,
284}
285
286impl<'a, T> Handler<'a, State> for ReplicaSetHandler<T>
287where
288 T: AsyncHash + Send + Sync + fmt::Debug + 'a,
289{
290 fn get<'b>(self: Box<Self>) -> Option<GetHandler<'a, 'b, Txn, State>>
291 where
292 'b: 'a,
293 {
294 Some(Box::new(|txn, key| {
295 Box::pin(async move {
296 debug!("GET {:?} replicas", self.cluster);
297
298 let keyring = self.cluster.keyring(*txn.id()).await?;
299
300 if key.is_some() {
301 let key = Arc::<[u8]>::try_from(key)?;
302
303 for (host, public_key) in keyring.iter() {
304 if public_key.as_bytes() == &key[..] {
305 return Ok(Value::Link(host.clone().into()).into());
306 }
307 }
308
309 Ok(Value::None.into())
310 } else {
311 let keyring = keyring
312 .iter()
313 .map(|(host, public_key)| {
314 (
315 Value::from(host.clone()),
316 Value::Bytes(Arc::new(*public_key.as_bytes())),
317 )
318 })
319 .map(|(host, public_key)| {
320 State::Tuple(vec![host.into(), public_key.into()].into())
321 })
322 .collect();
323
324 Ok(State::Tuple(keyring))
325 }
326 })
327 }))
328 }
329
330 fn put<'b>(self: Box<Self>) -> Option<PutHandler<'a, 'b, Txn, State>>
331 where
332 'b: 'a,
333 {
334 Some(Box::new(|txn, key, value| {
335 Box::pin(async move {
336 debug!("PUT {:?} replica {key}: {value:?}", self.cluster);
337
338 let new_replica_hash = Value::try_from(value)?;
339 let this_replica_hash = AsyncHash::hash(self.cluster.state(), *txn.id())
340 .map_ok(|hash| Arc::<[u8]>::from(hash.as_slice()))
341 .map_ok(Value::Bytes)
342 .await?;
343
344 if new_replica_hash != this_replica_hash {
345 return Err(bad_request!("the provided cluster state hash {new_replica_hash} differs from the hash of this cluster {this_replica_hash}"));
346 }
347
348 let mut keyring = self.cluster.keyring_mut(*txn.id()).await?;
349
350 let (host, public_key): (Host, Arc<[u8]>) =
351 key.try_cast_into(|v| TCError::unexpected(v, "a host address and key"))?;
352
353 if self.cluster.link().host() == Some(&host) {
354 return Err(bad_request!(
355 "cannot overwrite the public key of {:?}",
356 self.cluster
357 ));
358 }
359
360 let public_key = VerifyingKey::try_from(&*public_key)
361 .map_err(|cause| bad_request!("invalid public key: {cause}"))?;
362
363 keyring.insert(host, public_key);
364
365 Ok(())
366 })
367 }))
368 }
369
370 fn post<'b>(self: Box<Self>) -> Option<PostHandler<'a, 'b, Txn, State>>
371 where
372 'b: 'a,
373 {
374 Some(Box::new(|txn, mut params| {
375 Box::pin(async move {
376 let action: Id = params.require(&*ACTION)?;
377 if action != JOIN {
378 return Err(bad_request!("unrecognized action: {action}"));
379 }
380
381 let replicas: Tuple<Link> = params.require(&*REPLICAS)?;
382
383 let this_host = self
384 .cluster
385 .link()
386 .host()
387 .cloned()
388 .ok_or_else(|| bad_request!("{:?} cannot join a cluster", self.cluster))?;
389
390 let this_path = self.cluster.link().path();
391
392 let public_key = Value::Bytes((*self.cluster.public_key().as_bytes()).into());
393 let hash = AsyncHash::hash(self.cluster.state(), *txn.id()).await?;
394
395 replicas
396 .into_iter()
397 .map(|mut that_host| {
398 that_host.extend(this_path.iter().cloned());
399
400 let this_host = this_host.clone();
401 let public_key = public_key.clone();
402
403 async move {
404 if that_host.host().is_none() {
405 Err(bad_request!(
406 "{} received a join request with no host",
407 this_host
408 ))
409 } else if that_host.host() == Some(&this_host) {
410 Err(bad_request!(
411 "{} received a join request for itself",
412 this_host
413 ))
414 } else if that_host.path() == this_path {
415 txn.put(
416 that_host.append(REPLICAS),
417 (this_host, public_key),
418 Value::Bytes(hash.as_slice().into()),
419 )
420 .await
421 } else {
422 Err(bad_request!(
423 "{} received a request to join {}",
424 this_host,
425 that_host
426 ))
427 }
428 }
429 })
430 .collect::<FuturesUnordered<_>>()
431 .try_fold(State::default(), |_, _| {
432 futures::future::ready(Ok(State::default()))
433 })
434 .await
435 })
436 }))
437 }
438
439 fn delete<'b>(self: Box<Self>) -> Option<DeleteHandler<'a, 'b, Txn>>
440 where
441 'b: 'a,
442 {
443 Some(Box::new(|txn, key| {
444 Box::pin(async move {
445 debug!("DELETE {:?} replicas {key}", self.cluster);
446
447 let mut keyring = self.cluster.keyring_mut(*txn.id()).await?;
448
449 let hosts = Tuple::<Value>::try_from(key)?;
450
451 for host in hosts {
452 let host =
453 Host::try_cast_from(host, |v| TCError::unexpected(v, "a host address"))?;
454
455 keyring.remove(&host);
456 }
457
458 Ok(())
459 })
460 }))
461 }
462}
463
464impl<T> From<Cluster<T>> for ReplicaSetHandler<T> {
465 fn from(cluster: Cluster<T>) -> Self {
466 Self { cluster }
467 }
468}
469
470struct ReplicationHandler<'a, T> {
471 cluster: Cluster<T>,
472 path: &'a [PathSegment],
473}
474
475impl<'a, T> ReplicationHandler<'a, T> {
476 fn new(cluster: Cluster<T>, path: &'a [PathSegment]) -> Self {
477 Self { cluster, path }
478 }
479}
480
481impl<'a, T> Handler<'a, State> for ReplicationHandler<'a, T>
482where
483 T: Public<State> + Transact + Send + Sync + fmt::Debug + 'a,
484{
485 fn get<'b>(self: Box<Self>) -> Option<GetHandler<'a, 'b, Txn, State>>
486 where
487 'b: 'a,
488 {
489 Some(Box::new(|txn, key| {
490 Box::pin(async move {
491 let txn = self.cluster.claim(txn.clone())?;
492 self.cluster.state().get(&txn, self.path, key).await
493 })
494 }))
495 }
496
497 fn put<'b>(self: Box<Self>) -> Option<PutHandler<'a, 'b, Txn, State>>
498 where
499 'b: 'a,
500 {
501 let path = self.path;
502 let cluster = self.cluster;
503
504 Some(Box::new(move |txn, key, value| {
505 Box::pin(async move {
506 let should_commit = txn.leader(cluster.path())?.is_none();
507 let txn = cluster.claim(txn.clone())?;
508
509 cluster
510 .state()
511 .put(&txn, path, key.clone(), value.clone())
512 .await?;
513
514 maybe_replicate(
515 &cluster,
516 &txn,
517 path,
518 |txn, link| {
519 let key = key.clone();
520 let value = value.clone();
521 async move { txn.put(link, key, value).await }
522 },
523 should_commit,
524 )
525 .await
526 })
527 }))
528 }
529
530 fn post<'b>(self: Box<Self>) -> Option<PostHandler<'a, 'b, Txn, State>>
531 where
532 'b: 'a,
533 {
534 Some(Box::new(|txn, params| {
535 Box::pin(async move {
536 let should_commit = txn.leader(self.cluster.path())?.is_none();
537 let txn = self.cluster.claim(txn.clone())?;
538 let response = self.cluster.state().post(&txn, self.path, params).await?;
539
540 if should_commit {
541 if let Some(owner) = txn.owner()? {
542 if owner == self.cluster.public_key() {
543 let txn = self.cluster.lock(txn)?;
544 self.cluster.replicate_commit(&txn).await?;
545 }
546 }
547 }
548
549 Ok(response)
550 })
551 }))
552 }
553
554 fn delete<'b>(self: Box<Self>) -> Option<DeleteHandler<'a, 'b, Txn>>
555 where
556 'b: 'a,
557 {
558 let path = self.path;
559 let cluster = self.cluster;
560
561 Some(Box::new(move |txn, key| {
562 Box::pin(async move {
563 let should_commit = txn.leader(cluster.path())?.is_none();
564 let txn = cluster.claim(txn.clone())?;
565
566 cluster.state().delete(&txn, path, key.clone()).await?;
567
568 maybe_replicate(
569 &cluster,
570 &txn,
571 path,
572 |txn, link| {
573 let key = key.clone();
574
575 async move { txn.delete(link, key).await }
576 },
577 should_commit,
578 )
579 .await
580 })
581 }))
582 }
583}
584
585async fn maybe_replicate<T, Op, Fut>(
586 cluster: &Cluster<T>,
587 txn: &Txn,
588 path: &[PathSegment],
589 op: Op,
590 should_commit: bool,
591) -> TCResult<()>
592where
593 T: Transact + Send + Sync + fmt::Debug,
594 Op: Fn(Txn, Link) -> Fut,
595 Fut: Future<Output = TCResult<()>>,
596{
597 let (_leader, leader_pk) = txn
598 .leader(cluster.path())?
599 .ok_or_else(|| internal!("leaderless transaction"))?;
600
601 if leader_pk == cluster.public_key() {
602 cluster.replicate_write(txn, path, op).await?;
603 }
604
605 if should_commit {
606 if let Some(owner) = txn.owner()? {
607 if owner == cluster.public_key() {
608 let txn = cluster.lock(txn.clone())?;
609 cluster.replicate_commit(&txn).await?;
610 }
611 }
612 }
613
614 Ok(())
615}
616
617impl<T> Cluster<T>
618where
619 T: AsyncHash + Route<State> + IsDir + Transact + Send + Sync + fmt::Debug,
620{
621 pub fn route_owned<'a>(
622 self,
623 path: &'a [PathSegment],
624 ) -> Option<Box<dyn Handler<'a, State> + 'a>>
625 where
626 T: 'a,
627 {
628 debug!("{:?} routing request to {}...", self, TCPath::from(path));
629
630 if path.is_empty() {
631 Some(Box::new(ClusterHandler::from(self)))
632 } else if path == [REPLICAS] {
633 Some(Box::new(ReplicaSetHandler::from(self)))
634 } else {
635 Some(Box::new(ReplicationHandler::new(self, path)))
636 }
637 }
638}
639
640impl<T> Route<State> for Cluster<T>
641where
642 T: AsyncHash + Route<State> + IsDir + Transact + Clone + Send + Sync + fmt::Debug,
643{
644 fn route<'a>(&'a self, path: &'a [PathSegment]) -> Option<Box<dyn Handler<'a, State> + 'a>> {
645 self.clone().route_owned(path)
646 }
647}