netidx_protocols/
cluster.rs

1use anyhow::Result;
2use futures::{channel::mpsc, prelude::*};
3use log::{info, warn};
4use netidx::{
5    pack::Pack,
6    path::Path,
7    publisher::{Publisher, Val, Value, WriteRequest},
8    resolver_client::ChangeTracker,
9    subscriber::{Dval, Event, Subscriber},
10    utils,
11};
12use poolshark::Pooled;
13use std::{
14    collections::{HashMap, HashSet},
15    iter,
16    marker::PhantomData,
17};
18use tokio::time;
19use uuid::Uuid;
20
21pub fn uuid_string(id: Uuid) -> String {
22    use uuid::fmt::Simple;
23    let mut buf = [0u8; Simple::LENGTH];
24    Simple::from_uuid(id).encode_lower(&mut buf).into()
25}
26
27/// Simple clustering based on netidx. Each member publishes a uuid to
28/// a common base path, which is used to discover all other
29/// members. Commands may be sent to and received from all other
30/// members as broadcasts. On initialization all members wait until
31/// there are at least `shards` other members before starting
32/// operations. There can be more than `shards` members at any time,
33/// and members can enter and leave the cluster at will. It is up to
34/// the user to ensure state integrity under these constraints.
35///
36/// A random cluster member is elected 'primary' by an common
37/// algorithm, the primary may change as members join and leave the
38/// cluster, but with a stable member set all members will agree on
39/// which one is the primary.
40pub struct Cluster<T: Pack> {
41    t: PhantomData<T>,
42    ctrack: ChangeTracker,
43    publisher: Publisher,
44    subscriber: Subscriber,
45    our_path: Path,
46    us: Val,
47    others: HashMap<Path, Dval>,
48    cmd: mpsc::Receiver<Pooled<Vec<WriteRequest>>>,
49    primary: bool,
50}
51
52impl<T: Pack> Cluster<T> {
53    /// Create a new cluster directly under `base`. It's wise to
54    /// ensure nothing else is publishing under `base`.
55    pub async fn new(
56        publisher: &Publisher,
57        subscriber: Subscriber,
58        base: Path,
59        shards: usize,
60    ) -> Result<Cluster<T>> {
61        let publisher = publisher.clone();
62        let (tx, cmd) = mpsc::channel(3);
63        let id = Uuid::new_v4();
64        let our_path = base.append(&uuid_string(id));
65        let us = publisher.publish(our_path.clone(), Value::Null)?;
66        let ctrack = ChangeTracker::new(base);
67        publisher.writes(us.id(), tx);
68        publisher.flushed().await;
69        let others = HashMap::new();
70        let t = PhantomData;
71        let mut t = Cluster {
72            t,
73            ctrack,
74            publisher,
75            subscriber,
76            our_path,
77            us,
78            cmd,
79            others,
80            primary: true,
81        };
82        while t.subscribed_others() < shards {
83            info!("waiting for {} other shards", shards);
84            t.poll_members().await?;
85            time::sleep(std::time::Duration::from_millis(100)).await;
86        }
87        Ok(t)
88    }
89
90    /// Return your own path
91    pub fn path(&self) -> Path {
92        self.our_path.clone()
93    }
94
95    /// Returns true if this cluster member is the primary, false
96    /// otherwise. May change after `poll_members`.
97    pub fn primary(&self) -> bool {
98        self.primary
99    }
100
101    fn subscribed_others(&self) -> usize {
102        self.others.len()
103            - self.others.values().filter(|d| d.last() == Event::Unsubscribed).count()
104    }
105
106    pub fn others(&self) -> usize {
107        self.publisher.subscribed_len(&self.us.id())
108    }
109
110    /// Poll the resolvers to see if any new members have joined the
111    /// cluster, return true if new members have potentially joined,
112    /// false if no new members have joined.
113    pub async fn poll_members(&mut self) -> Result<bool> {
114        if !self.subscriber.resolver().check_changed(&mut self.ctrack).await? {
115            Ok(false)
116        } else {
117            let path = self.ctrack.path().clone();
118            let mut l = self.subscriber.resolver().list(path).await?;
119            let all = l.drain(..).filter(|p| p != &self.our_path).collect::<HashSet<_>>();
120            self.others.retain(|p, _| all.contains(p));
121            for path in all {
122                if !self.others.contains_key(&path) {
123                    let dv = self.subscriber.subscribe(path.clone());
124                    self.others.insert(path, dv);
125                }
126            }
127            let mut paths =
128                iter::once(&self.our_path).chain(self.others.keys()).collect::<Vec<_>>();
129            paths.sort();
130            self.primary = self.our_path == *paths[0];
131            Ok(true)
132        }
133    }
134
135    /// Wait for some commands from other members of the cluster.
136    pub async fn wait_cmds(&mut self) -> Result<Vec<T>> {
137        match self.cmd.next().await {
138            None => bail!("cluster publish write stream ended"),
139            Some(mut reqs) => {
140                let mut cmds = Vec::new();
141                for req in reqs.drain(..) {
142                    if let Value::Bytes(b) = &req.value {
143                        if let Ok(cmd) = Pack::decode(&mut &***b) {
144                            cmds.push(cmd);
145                            continue;
146                        }
147                    }
148                    warn!("ignoring invalid cmd: {:?}", &req.value);
149                }
150                Ok(cmds)
151            }
152        }
153    }
154
155    /// Send a command out to other members of the cluster.
156    pub fn send_cmd(&self, cmd: &T) {
157        if self.others.len() > 0 {
158            let cmd: Value = utils::pack(cmd).unwrap().freeze().into();
159            for other in self.others.values() {
160                other.write(cmd.clone());
161            }
162        }
163    }
164
165    /// Send a command to just one other, identified by it's path.
166    pub fn send_cmd_to_one(&self, path: &Path, cmd: &T) {
167        if let Some(other) = self.others.get(path) {
168            let cmd = utils::pack(cmd).unwrap().freeze().into();
169            other.write(cmd);
170        }
171    }
172}