tc_server/cluster/public/
mod.rs

1use std::fmt;
2use std::sync::Arc;
3
4use futures::future::{Future, TryFutureExt};
5use futures::stream::{FuturesUnordered, TryStreamExt};
6use log::debug;
7use rjwt::VerifyingKey;
8use safecast::{TryCastFrom, TryCastInto};
9
10use tc_error::*;
11use tc_transact::hash::AsyncHash;
12use tc_transact::public::*;
13use tc_transact::{Gateway, Transact, Transaction};
14use tc_value::{Host, Link, Value};
15use tcgeneric::{label, Id, Label, Map, PathSegment, TCPath, Tuple};
16
17use crate::txn::Txn;
18use crate::State;
19
20use super::{Cluster, IsDir, REPLICAS};
21
22const ACTION: Label = label("action");
23const JOIN: Label = label("join");
24
25mod class;
26mod dir;
27mod library;
28#[cfg(feature = "service")]
29mod service;
30
31struct ClusterHandler<T> {
32    cluster: Cluster<T>,
33}
34
35impl<'a, T> Handler<'a, State> for ClusterHandler<T>
36where
37    T: AsyncHash + Public<State> + IsDir + Transact + Send + Sync + fmt::Debug + 'a,
38{
39    fn get<'b>(self: Box<Self>) -> Option<GetHandler<'a, 'b, Txn, State>>
40    where
41        'b: 'a,
42    {
43        Some(Box::new(|txn, key: Value| {
44            Box::pin(async move {
45                if txn.has_claims() {
46                    self.cluster.state().get(txn, &[], key).await
47                } else {
48                    let keyring = self.cluster.keyring(*txn.id()).await?;
49
50                    if key.is_none() {
51                        let keyring = keyring
52                            .values()
53                            .map(|public_key| Value::Bytes((*public_key.as_bytes()).into()))
54                            .map(State::from)
55                            .collect();
56
57                        Ok(State::Tuple(keyring))
58                    } else {
59                        let key = Arc::<[u8]>::try_from(key)?;
60
61                        if keyring
62                            .values()
63                            .any(|public_key| public_key.as_bytes() == &key[..])
64                        {
65                            Ok(State::from(Value::from(key)))
66                        } else {
67                            Err(not_found!(
68                                "{:?} (of {} keys)",
69                                Value::Bytes(key),
70                                keyring.len()
71                            ))
72                        }
73                    }
74                }
75            })
76        }))
77    }
78
79    fn put<'b>(self: Box<Self>) -> Option<PutHandler<'a, 'b, Txn, State>>
80    where
81        'b: 'a,
82    {
83        Some(Box::new(|txn, key, value| {
84            Box::pin(async move {
85                if txn.locked_by()?.is_some() {
86                    debug!("received commit message for {:?}", self.cluster);
87
88                    return if key.is_some() || value.is_some() {
89                        Err(TCError::unexpected((key, value), "empty commit message"))
90                    } else if txn.leader(self.cluster.path())?.is_none() {
91                        let txn = self.cluster.claim(txn.clone())?;
92                        self.cluster.replicate_commit(&txn).await
93                    } else {
94                        self.cluster.replicate_commit(txn).await
95                    };
96                }
97
98                let should_commit = txn.leader(self.cluster.path())?.is_none();
99                let txn = self.cluster.claim(txn.clone())?;
100
101                self.cluster
102                    .state
103                    .put(&txn, &[], key.clone(), value.clone())
104                    .await?;
105
106                debug!("write to {:?} succeeded", self.cluster);
107
108                let (_leader, leader_pk) = txn
109                    .leader(self.cluster.path())?
110                    .ok_or_else(|| internal!("leaderless transaction"))?;
111
112                if leader_pk == self.cluster.public_key() {
113                    self.cluster
114                        .replicate_write(&txn, &[], |txn, link| {
115                            debug!("replicating write to {:?} to {}...", self.cluster, link);
116
117                            let key = key.clone();
118                            let value = value.clone();
119                            async move { txn.put(link, key, value).await }
120                        })
121                        .await?;
122                }
123
124                if self.cluster.is_dir() {
125                    let this_host = if let Some(host) = self.cluster.link().host() {
126                        host
127                    } else {
128                        return Ok(());
129                    };
130
131                    let entry_name: PathSegment =
132                        key.try_cast_into(|v| TCError::unexpected(v, "a directory entry name"))?;
133
134                    let (this_replica_hash, this_replica_pk) = self
135                        .cluster
136                        .get_dir_item_key(*txn.id(), &entry_name)
137                        .await?
138                        .ok_or_else(|| {
139                            internal!("there is no directory entry {entry_name} to replicate")
140                        })?;
141
142                    let (leader, leader_pk) = txn.leader(self.cluster.path())?.expect("leader");
143
144                    if leader_pk == self.cluster.public_key() {
145                        let replica_set: Vec<Host> = if let Some(keyring) = self
146                            .cluster
147                            .get_dir_item_keyring(*txn.id(), &entry_name)
148                            .await?
149                        {
150                            keyring.keys().cloned().collect()
151                        } else {
152                            vec![]
153                        };
154
155                        let this_replica_path = self.cluster.path().clone().append(entry_name);
156
157                        replica_set
158                            .iter()
159                            .map(|host| {
160                                let replicas = replica_set
161                                    .iter()
162                                    .filter(|that_host| host != *that_host)
163                                    .cloned()
164                                    .map(Value::from)
165                                    .collect();
166
167                                let params: Map<State> = [
168                                    (Id::from(ACTION), Value::String(JOIN.into())),
169                                    (Id::from(REPLICAS), Value::Tuple(replicas)),
170                                ]
171                                .into_iter()
172                                .collect();
173
174                                txn.post(
175                                    Link::from((
176                                        host.clone(),
177                                        this_replica_path.clone().append(REPLICAS),
178                                    )),
179                                    params,
180                                )
181                            })
182                            .collect::<FuturesUnordered<_>>()
183                            .try_fold(State::default(), |_, _| {
184                                futures::future::ready(Ok(State::default()))
185                            })
186                            .await?;
187                    } else {
188                        let this_replica_hash = Value::Bytes(this_replica_hash.as_slice().into());
189                        let this_replica_pk = Value::Bytes(Arc::new(*this_replica_pk.as_bytes()));
190
191                        txn.put(
192                            leader.append(entry_name).append(REPLICAS),
193                            (this_host.clone(), this_replica_pk),
194                            this_replica_hash,
195                        )
196                        .await?;
197                    }
198                }
199
200                let owner = txn
201                    .owner()?
202                    .ok_or_else(|| internal!("ownerless transaction"))?;
203
204                if should_commit {
205                    if owner == self.cluster.public_key() {
206                        let txn = self.cluster.lock(txn)?;
207                        self.cluster.replicate_commit(&txn).await?;
208                    }
209                }
210
211                Ok(())
212            })
213        }))
214    }
215
216    fn post<'b>(self: Box<Self>) -> Option<PostHandler<'a, 'b, Txn, State>>
217    where
218        'b: 'a,
219    {
220        Some(Box::new(|txn, params| {
221            Box::pin(async move {
222                // This handler can only execute a hypothetical transaction
223
224                let should_rollback = txn.leader(self.cluster.path())?.is_none();
225                let txn = self.cluster.claim(txn.clone())?;
226                let response = self.cluster.state.post(&txn, &[], params).await?;
227
228                if should_rollback {
229                    if let Some(owner) = txn.owner()? {
230                        if owner == self.cluster.public_key() {
231                            let txn = self.cluster.lock(txn.clone())?;
232                            self.cluster.replicate_rollback(&txn).await?;
233                        }
234                    }
235                }
236
237                Ok(response)
238            })
239        }))
240    }
241
242    fn delete<'b>(self: Box<Self>) -> Option<DeleteHandler<'a, 'b, Txn>>
243    where
244        'b: 'a,
245    {
246        Some(Box::new(|txn, key| {
247            Box::pin(async move {
248                if txn.locked_by()?.is_some() {
249                    return if key.is_some() {
250                        Err(TCError::unexpected(key, "empty rollback message"))
251                    } else {
252                        self.cluster.replicate_rollback(txn).await
253                    };
254                }
255
256                let should_commit = txn.leader(self.cluster.path())?.is_none();
257                let txn = self.cluster.claim(txn.clone())?;
258                self.cluster.state.delete(&txn, &[], key.clone()).await?;
259
260                maybe_replicate(
261                    &self.cluster,
262                    &txn,
263                    &[],
264                    |txn, link| {
265                        let key = key.clone();
266                        async move { txn.delete(link, key).await }
267                    },
268                    should_commit,
269                )
270                .await
271            })
272        }))
273    }
274}
275
276impl<T> From<Cluster<T>> for ClusterHandler<T> {
277    fn from(cluster: Cluster<T>) -> Self {
278        Self { cluster }
279    }
280}
281
282struct ReplicaSetHandler<T> {
283    cluster: Cluster<T>,
284}
285
286impl<'a, T> Handler<'a, State> for ReplicaSetHandler<T>
287where
288    T: AsyncHash + Send + Sync + fmt::Debug + 'a,
289{
290    fn get<'b>(self: Box<Self>) -> Option<GetHandler<'a, 'b, Txn, State>>
291    where
292        'b: 'a,
293    {
294        Some(Box::new(|txn, key| {
295            Box::pin(async move {
296                debug!("GET {:?} replicas", self.cluster);
297
298                let keyring = self.cluster.keyring(*txn.id()).await?;
299
300                if key.is_some() {
301                    let key = Arc::<[u8]>::try_from(key)?;
302
303                    for (host, public_key) in keyring.iter() {
304                        if public_key.as_bytes() == &key[..] {
305                            return Ok(Value::Link(host.clone().into()).into());
306                        }
307                    }
308
309                    Ok(Value::None.into())
310                } else {
311                    let keyring = keyring
312                        .iter()
313                        .map(|(host, public_key)| {
314                            (
315                                Value::from(host.clone()),
316                                Value::Bytes(Arc::new(*public_key.as_bytes())),
317                            )
318                        })
319                        .map(|(host, public_key)| {
320                            State::Tuple(vec![host.into(), public_key.into()].into())
321                        })
322                        .collect();
323
324                    Ok(State::Tuple(keyring))
325                }
326            })
327        }))
328    }
329
330    fn put<'b>(self: Box<Self>) -> Option<PutHandler<'a, 'b, Txn, State>>
331    where
332        'b: 'a,
333    {
334        Some(Box::new(|txn, key, value| {
335            Box::pin(async move {
336                debug!("PUT {:?} replica {key}: {value:?}", self.cluster);
337
338                let new_replica_hash = Value::try_from(value)?;
339                let this_replica_hash = AsyncHash::hash(self.cluster.state(), *txn.id())
340                    .map_ok(|hash| Arc::<[u8]>::from(hash.as_slice()))
341                    .map_ok(Value::Bytes)
342                    .await?;
343
344                if new_replica_hash != this_replica_hash {
345                    return Err(bad_request!("the provided cluster state hash {new_replica_hash} differs from the hash of this cluster {this_replica_hash}"));
346                }
347
348                let mut keyring = self.cluster.keyring_mut(*txn.id()).await?;
349
350                let (host, public_key): (Host, Arc<[u8]>) =
351                    key.try_cast_into(|v| TCError::unexpected(v, "a host address and key"))?;
352
353                if self.cluster.link().host() == Some(&host) {
354                    return Err(bad_request!(
355                        "cannot overwrite the public key of {:?}",
356                        self.cluster
357                    ));
358                }
359
360                let public_key = VerifyingKey::try_from(&*public_key)
361                    .map_err(|cause| bad_request!("invalid public key: {cause}"))?;
362
363                keyring.insert(host, public_key);
364
365                Ok(())
366            })
367        }))
368    }
369
370    fn post<'b>(self: Box<Self>) -> Option<PostHandler<'a, 'b, Txn, State>>
371    where
372        'b: 'a,
373    {
374        Some(Box::new(|txn, mut params| {
375            Box::pin(async move {
376                let action: Id = params.require(&*ACTION)?;
377                if action != JOIN {
378                    return Err(bad_request!("unrecognized action: {action}"));
379                }
380
381                let replicas: Tuple<Link> = params.require(&*REPLICAS)?;
382
383                let this_host = self
384                    .cluster
385                    .link()
386                    .host()
387                    .cloned()
388                    .ok_or_else(|| bad_request!("{:?} cannot join a cluster", self.cluster))?;
389
390                let this_path = self.cluster.link().path();
391
392                let public_key = Value::Bytes((*self.cluster.public_key().as_bytes()).into());
393                let hash = AsyncHash::hash(self.cluster.state(), *txn.id()).await?;
394
395                replicas
396                    .into_iter()
397                    .map(|mut that_host| {
398                        that_host.extend(this_path.iter().cloned());
399
400                        let this_host = this_host.clone();
401                        let public_key = public_key.clone();
402
403                        async move {
404                            if that_host.host().is_none() {
405                                Err(bad_request!(
406                                    "{} received a join request with no host",
407                                    this_host
408                                ))
409                            } else if that_host.host() == Some(&this_host) {
410                                Err(bad_request!(
411                                    "{} received a join request for itself",
412                                    this_host
413                                ))
414                            } else if that_host.path() == this_path {
415                                txn.put(
416                                    that_host.append(REPLICAS),
417                                    (this_host, public_key),
418                                    Value::Bytes(hash.as_slice().into()),
419                                )
420                                .await
421                            } else {
422                                Err(bad_request!(
423                                    "{} received a request to join {}",
424                                    this_host,
425                                    that_host
426                                ))
427                            }
428                        }
429                    })
430                    .collect::<FuturesUnordered<_>>()
431                    .try_fold(State::default(), |_, _| {
432                        futures::future::ready(Ok(State::default()))
433                    })
434                    .await
435            })
436        }))
437    }
438
439    fn delete<'b>(self: Box<Self>) -> Option<DeleteHandler<'a, 'b, Txn>>
440    where
441        'b: 'a,
442    {
443        Some(Box::new(|txn, key| {
444            Box::pin(async move {
445                debug!("DELETE {:?} replicas {key}", self.cluster);
446
447                let mut keyring = self.cluster.keyring_mut(*txn.id()).await?;
448
449                let hosts = Tuple::<Value>::try_from(key)?;
450
451                for host in hosts {
452                    let host =
453                        Host::try_cast_from(host, |v| TCError::unexpected(v, "a host address"))?;
454
455                    keyring.remove(&host);
456                }
457
458                Ok(())
459            })
460        }))
461    }
462}
463
464impl<T> From<Cluster<T>> for ReplicaSetHandler<T> {
465    fn from(cluster: Cluster<T>) -> Self {
466        Self { cluster }
467    }
468}
469
470struct ReplicationHandler<'a, T> {
471    cluster: Cluster<T>,
472    path: &'a [PathSegment],
473}
474
475impl<'a, T> ReplicationHandler<'a, T> {
476    fn new(cluster: Cluster<T>, path: &'a [PathSegment]) -> Self {
477        Self { cluster, path }
478    }
479}
480
481impl<'a, T> Handler<'a, State> for ReplicationHandler<'a, T>
482where
483    T: Public<State> + Transact + Send + Sync + fmt::Debug + 'a,
484{
485    fn get<'b>(self: Box<Self>) -> Option<GetHandler<'a, 'b, Txn, State>>
486    where
487        'b: 'a,
488    {
489        Some(Box::new(|txn, key| {
490            Box::pin(async move {
491                let txn = self.cluster.claim(txn.clone())?;
492                self.cluster.state().get(&txn, self.path, key).await
493            })
494        }))
495    }
496
497    fn put<'b>(self: Box<Self>) -> Option<PutHandler<'a, 'b, Txn, State>>
498    where
499        'b: 'a,
500    {
501        let path = self.path;
502        let cluster = self.cluster;
503
504        Some(Box::new(move |txn, key, value| {
505            Box::pin(async move {
506                let should_commit = txn.leader(cluster.path())?.is_none();
507                let txn = cluster.claim(txn.clone())?;
508
509                cluster
510                    .state()
511                    .put(&txn, path, key.clone(), value.clone())
512                    .await?;
513
514                maybe_replicate(
515                    &cluster,
516                    &txn,
517                    path,
518                    |txn, link| {
519                        let key = key.clone();
520                        let value = value.clone();
521                        async move { txn.put(link, key, value).await }
522                    },
523                    should_commit,
524                )
525                .await
526            })
527        }))
528    }
529
530    fn post<'b>(self: Box<Self>) -> Option<PostHandler<'a, 'b, Txn, State>>
531    where
532        'b: 'a,
533    {
534        Some(Box::new(|txn, params| {
535            Box::pin(async move {
536                let should_commit = txn.leader(self.cluster.path())?.is_none();
537                let txn = self.cluster.claim(txn.clone())?;
538                let response = self.cluster.state().post(&txn, self.path, params).await?;
539
540                if should_commit {
541                    if let Some(owner) = txn.owner()? {
542                        if owner == self.cluster.public_key() {
543                            let txn = self.cluster.lock(txn)?;
544                            self.cluster.replicate_commit(&txn).await?;
545                        }
546                    }
547                }
548
549                Ok(response)
550            })
551        }))
552    }
553
554    fn delete<'b>(self: Box<Self>) -> Option<DeleteHandler<'a, 'b, Txn>>
555    where
556        'b: 'a,
557    {
558        let path = self.path;
559        let cluster = self.cluster;
560
561        Some(Box::new(move |txn, key| {
562            Box::pin(async move {
563                let should_commit = txn.leader(cluster.path())?.is_none();
564                let txn = cluster.claim(txn.clone())?;
565
566                cluster.state().delete(&txn, path, key.clone()).await?;
567
568                maybe_replicate(
569                    &cluster,
570                    &txn,
571                    path,
572                    |txn, link| {
573                        let key = key.clone();
574
575                        async move { txn.delete(link, key).await }
576                    },
577                    should_commit,
578                )
579                .await
580            })
581        }))
582    }
583}
584
585async fn maybe_replicate<T, Op, Fut>(
586    cluster: &Cluster<T>,
587    txn: &Txn,
588    path: &[PathSegment],
589    op: Op,
590    should_commit: bool,
591) -> TCResult<()>
592where
593    T: Transact + Send + Sync + fmt::Debug,
594    Op: Fn(Txn, Link) -> Fut,
595    Fut: Future<Output = TCResult<()>>,
596{
597    let (_leader, leader_pk) = txn
598        .leader(cluster.path())?
599        .ok_or_else(|| internal!("leaderless transaction"))?;
600
601    if leader_pk == cluster.public_key() {
602        cluster.replicate_write(txn, path, op).await?;
603    }
604
605    if should_commit {
606        if let Some(owner) = txn.owner()? {
607            if owner == cluster.public_key() {
608                let txn = cluster.lock(txn.clone())?;
609                cluster.replicate_commit(&txn).await?;
610            }
611        }
612    }
613
614    Ok(())
615}
616
617impl<T> Cluster<T>
618where
619    T: AsyncHash + Route<State> + IsDir + Transact + Send + Sync + fmt::Debug,
620{
621    pub fn route_owned<'a>(
622        self,
623        path: &'a [PathSegment],
624    ) -> Option<Box<dyn Handler<'a, State> + 'a>>
625    where
626        T: 'a,
627    {
628        debug!("{:?} routing request to {}...", self, TCPath::from(path));
629
630        if path.is_empty() {
631            Some(Box::new(ClusterHandler::from(self)))
632        } else if path == [REPLICAS] {
633            Some(Box::new(ReplicaSetHandler::from(self)))
634        } else {
635            Some(Box::new(ReplicationHandler::new(self, path)))
636        }
637    }
638}
639
640impl<T> Route<State> for Cluster<T>
641where
642    T: AsyncHash + Route<State> + IsDir + Transact + Clone + Send + Sync + fmt::Debug,
643{
644    fn route<'a>(&'a self, path: &'a [PathSegment]) -> Option<Box<dyn Handler<'a, State> + 'a>> {
645        self.clone().route_owned(path)
646    }
647}