netidx_protocols/
cluster.rs1use anyhow::Result;
2use futures::{channel::mpsc, prelude::*};
3use log::{info, warn};
4use netidx::{
5 pack::Pack,
6 path::Path,
7 publisher::{Publisher, Val, Value, WriteRequest},
8 resolver_client::ChangeTracker,
9 subscriber::{Dval, Event, Subscriber},
10 utils,
11};
12use poolshark::Pooled;
13use std::{
14 collections::{HashMap, HashSet},
15 iter,
16 marker::PhantomData,
17};
18use tokio::time;
19use uuid::Uuid;
20
21pub fn uuid_string(id: Uuid) -> String {
22 use uuid::fmt::Simple;
23 let mut buf = [0u8; Simple::LENGTH];
24 Simple::from_uuid(id).encode_lower(&mut buf).into()
25}
26
27pub struct Cluster<T: Pack> {
41 t: PhantomData<T>,
42 ctrack: ChangeTracker,
43 publisher: Publisher,
44 subscriber: Subscriber,
45 our_path: Path,
46 us: Val,
47 others: HashMap<Path, Dval>,
48 cmd: mpsc::Receiver<Pooled<Vec<WriteRequest>>>,
49 primary: bool,
50}
51
52impl<T: Pack> Cluster<T> {
53 pub async fn new(
56 publisher: &Publisher,
57 subscriber: Subscriber,
58 base: Path,
59 shards: usize,
60 ) -> Result<Cluster<T>> {
61 let publisher = publisher.clone();
62 let (tx, cmd) = mpsc::channel(3);
63 let id = Uuid::new_v4();
64 let our_path = base.append(&uuid_string(id));
65 let us = publisher.publish(our_path.clone(), Value::Null)?;
66 let ctrack = ChangeTracker::new(base);
67 publisher.writes(us.id(), tx);
68 publisher.flushed().await;
69 let others = HashMap::new();
70 let t = PhantomData;
71 let mut t = Cluster {
72 t,
73 ctrack,
74 publisher,
75 subscriber,
76 our_path,
77 us,
78 cmd,
79 others,
80 primary: true,
81 };
82 while t.subscribed_others() < shards {
83 info!("waiting for {} other shards", shards);
84 t.poll_members().await?;
85 time::sleep(std::time::Duration::from_millis(100)).await;
86 }
87 Ok(t)
88 }
89
90 pub fn path(&self) -> Path {
92 self.our_path.clone()
93 }
94
95 pub fn primary(&self) -> bool {
98 self.primary
99 }
100
101 fn subscribed_others(&self) -> usize {
102 self.others.len()
103 - self.others.values().filter(|d| d.last() == Event::Unsubscribed).count()
104 }
105
106 pub fn others(&self) -> usize {
107 self.publisher.subscribed_len(&self.us.id())
108 }
109
110 pub async fn poll_members(&mut self) -> Result<bool> {
114 if !self.subscriber.resolver().check_changed(&mut self.ctrack).await? {
115 Ok(false)
116 } else {
117 let path = self.ctrack.path().clone();
118 let mut l = self.subscriber.resolver().list(path).await?;
119 let all = l.drain(..).filter(|p| p != &self.our_path).collect::<HashSet<_>>();
120 self.others.retain(|p, _| all.contains(p));
121 for path in all {
122 if !self.others.contains_key(&path) {
123 let dv = self.subscriber.subscribe(path.clone());
124 self.others.insert(path, dv);
125 }
126 }
127 let mut paths =
128 iter::once(&self.our_path).chain(self.others.keys()).collect::<Vec<_>>();
129 paths.sort();
130 self.primary = self.our_path == *paths[0];
131 Ok(true)
132 }
133 }
134
135 pub async fn wait_cmds(&mut self) -> Result<Vec<T>> {
137 match self.cmd.next().await {
138 None => bail!("cluster publish write stream ended"),
139 Some(mut reqs) => {
140 let mut cmds = Vec::new();
141 for req in reqs.drain(..) {
142 if let Value::Bytes(b) = &req.value {
143 if let Ok(cmd) = Pack::decode(&mut &***b) {
144 cmds.push(cmd);
145 continue;
146 }
147 }
148 warn!("ignoring invalid cmd: {:?}", &req.value);
149 }
150 Ok(cmds)
151 }
152 }
153 }
154
155 pub fn send_cmd(&self, cmd: &T) {
157 if self.others.len() > 0 {
158 let cmd: Value = utils::pack(cmd).unwrap().freeze().into();
159 for other in self.others.values() {
160 other.write(cmd.clone());
161 }
162 }
163 }
164
165 pub fn send_cmd_to_one(&self, path: &Path, cmd: &T) {
167 if let Some(other) = self.others.get(path) {
168 let cmd = utils::pack(cmd).unwrap().freeze().into();
169 other.write(cmd);
170 }
171 }
172}