hedge_rs/
lib.rs

1mod protocol;
2
3use anyhow::{Error, Result, anyhow};
4use base64ct::{Base64, Encoding};
5use crossbeam_channel::{Receiver, Sender, unbounded};
6use log::*;
7use protocol::*;
8use spindle_rs::*;
9use std::collections::HashMap;
10use std::fmt::Write as _;
11use std::io::{BufReader, prelude::*};
12use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::mpsc;
15use std::sync::{Arc, Mutex};
16use std::thread;
17use std::time::{Duration, Instant};
18use uuid::Uuid;
19
20#[macro_use(defer)]
21extern crate scopeguard;
22
23#[derive(Debug)]
24pub enum Comms {
25    ToLeader { msg: Vec<u8>, tx: mpsc::Sender<Vec<u8>> },
26    Broadcast { msg: Vec<u8>, tx: mpsc::Sender<Vec<u8>> },
27}
28
29#[derive(Debug)]
30enum WorkerCtrl {
31    TcpServer(TcpStream),
32    PingMember(String),
33    ToLeader { msg: Vec<u8>, tx: Sender<Vec<u8>> },
34}
35
36pub struct Op {
37    db: String,
38    table: String,
39    name: String,
40    id: String,
41    lock: Vec<Arc<Mutex<Lock>>>,
42    leader: Arc<AtomicUsize>,
43    lease_ms: u64,
44    sync_ms: u64,
45    members: Arc<Mutex<HashMap<String, usize>>>,
46    tx_worker: Vec<Sender<WorkerCtrl>>,
47    tx_toleader: Option<mpsc::Sender<Comms>>,
48    tx_broadcast: Option<mpsc::Sender<Comms>>,
49    active: Arc<AtomicUsize>,
50}
51
52impl Op {
53    /// Allows for discovery of the builder.
54    pub fn builder() -> OpBuilder {
55        OpBuilder::default()
56    }
57
58    /// TODO:
59    pub fn run(&mut self) -> Result<()> {
60        {
61            let members = self.members.clone();
62            let id = self.id.clone();
63            if let Ok(mut v) = members.lock() {
64                v.insert(id, 0);
65            }
66        }
67
68        let mut lock_name = String::new();
69        write!(&mut lock_name, "hedge/spindle/{}", self.name.clone()).unwrap();
70        let mut lease_ms = self.lease_ms;
71        if lease_ms == 0 {
72            lease_ms = 3_000;
73        }
74
75        let (tx_ldr, rx_ldr) = mpsc::channel();
76        self.lock = vec![Arc::new(Mutex::new(
77            LockBuilder::new()
78                .db(self.db.clone())
79                .table(self.table.clone())
80                .name(lock_name)
81                .id(self.id.clone())
82                .lease_ms(lease_ms)
83                .leader_tx(Some(tx_ldr))
84                .build(),
85        ))];
86
87        {
88            let lc = self.lock[0].clone();
89            if let Ok(mut v) = lc.lock() {
90                v.run()?;
91            }
92        }
93
94        // We will use the channel-style callback from spindle_rs.
95        let leader_setter = self.leader.clone();
96        thread::spawn(move || {
97            loop {
98                let ldr = rx_ldr.recv();
99                match ldr {
100                    Ok(v) => leader_setter.store(v, Ordering::Relaxed),
101                    Err(_) => {}
102                }
103            }
104        });
105
106        let (tx, rx): (Sender<WorkerCtrl>, Receiver<WorkerCtrl>) = unbounded();
107        let rxs: Arc<Mutex<HashMap<usize, Receiver<WorkerCtrl>>>> = Arc::new(Mutex::new(HashMap::new()));
108        let cpus = num_cpus::get();
109
110        self.tx_worker = vec![tx.clone()];
111
112        for i in 0..cpus {
113            let recv = rxs.clone();
114
115            {
116                let mut rv = recv.lock().unwrap();
117                rv.insert(i, rx.clone());
118            }
119        }
120
121        // Start our worker threads for our TCP server.
122        for i in 0..cpus {
123            let lock = self.lock[0].clone();
124            let recv = rxs.clone();
125            let members = self.members.clone();
126            let leader = self.leader.clone();
127            let toleader = match self.tx_toleader.clone() {
128                Some(v) => vec![v.clone()],
129                None => vec![],
130            };
131
132            thread::spawn(move || {
133                loop {
134                    let mut rx: Option<Receiver<WorkerCtrl>> = None;
135
136                    {
137                        let rxval = match recv.lock() {
138                            Ok(v) => v,
139                            Err(e) => {
140                                error!("T{i}: lock failed: {e}");
141                                break;
142                            }
143                        };
144
145                        if let Some(v) = rxval.get(&i) {
146                            rx = Some(v.clone());
147                        }
148                    }
149
150                    match rx.unwrap().recv().unwrap() {
151                        WorkerCtrl::TcpServer(stream) => {
152                            let start = Instant::now();
153
154                            defer! {
155                                info!("[T{i}]: tcp took {:?}", start.elapsed());
156                            }
157
158                            handle_protocol(
159                                i,
160                                stream,
161                                leader.load(Ordering::Acquire),
162                                members.clone(),
163                                toleader.clone(),
164                            );
165                        }
166                        WorkerCtrl::PingMember(name) => {
167                            let mut delete = false;
168                            let start = Instant::now();
169
170                            defer! {
171                                info!("[T{i}]: ping took {:?}", start.elapsed());
172                            }
173
174                            'onetime: loop {
175                                let hp: Vec<&str> = name.split(":").collect();
176                                let hh: Vec<&str> = hp[0].split(".").collect();
177                                let ip = SocketAddr::new(
178                                    IpAddr::V4(Ipv4Addr::new(
179                                        hh[0].parse::<u8>().unwrap(),
180                                        hh[1].parse::<u8>().unwrap(),
181                                        hh[2].parse::<u8>().unwrap(),
182                                        hh[3].parse::<u8>().unwrap(),
183                                    )),
184                                    hp[1].parse::<u16>().unwrap(),
185                                );
186
187                                let mut stream = match TcpStream::connect_timeout(&ip, Duration::from_secs(5)) {
188                                    Ok(v) => v,
189                                    Err(e) => {
190                                        error!("connect_timeout to {name} failed: {e}");
191                                        delete = true;
192                                        break 'onetime;
193                                    }
194                                };
195
196                                let mut send = String::new();
197                                write!(&mut send, "{}\n", CMD_PING).unwrap();
198                                if let Err(_) = stream.write_all(send.as_bytes()) {
199                                    break 'onetime;
200                                }
201
202                                let mut reader = BufReader::new(&stream);
203                                let mut resp = String::new();
204                                reader.read_line(&mut resp).unwrap();
205
206                                if !resp.starts_with("+1") {
207                                    delete = true
208                                }
209
210                                break 'onetime;
211                            }
212
213                            if delete {
214                                let members = members.clone();
215                                if let Ok(mut v) = members.lock() {
216                                    v.remove(&name);
217                                }
218                            }
219                        }
220                        WorkerCtrl::ToLeader { msg, tx } => {
221                            let start = Instant::now();
222
223                            defer! {
224                                info!("[T{i}]: toleader took {:?}", start.elapsed());
225                            }
226
227                            'onetime: loop {
228                                let mut leader = String::new();
229
230                                {
231                                    if let Ok(v) = lock.lock() {
232                                        let (_, writer, _) = v.has_lock();
233                                        write!(&mut leader, "{}", writer).unwrap();
234                                    }
235                                }
236
237                                if leader.is_empty() {
238                                    tx.send("-no leader".as_bytes().to_vec()).unwrap();
239                                    break 'onetime;
240                                }
241
242                                let encoded = Base64::encode_string(&msg);
243
244                                let hp: Vec<&str> = leader.split(":").collect();
245                                let hh: Vec<&str> = hp[0].split(".").collect();
246                                let leader_ip = SocketAddr::new(
247                                    IpAddr::V4(Ipv4Addr::new(
248                                        hh[0].parse::<u8>().unwrap(),
249                                        hh[1].parse::<u8>().unwrap(),
250                                        hh[2].parse::<u8>().unwrap(),
251                                        hh[3].parse::<u8>().unwrap(),
252                                    )),
253                                    hp[1].parse::<u16>().unwrap(),
254                                );
255
256                                let mut stream = match TcpStream::connect_timeout(&leader_ip, Duration::from_secs(5)) {
257                                    Ok(v) => v,
258                                    Err(e) => {
259                                        let mut err = String::new();
260                                        write!(&mut err, "-connect_timeout failed: {e}").unwrap();
261                                        tx.send(err.as_bytes().to_vec()).unwrap();
262                                        break 'onetime;
263                                    }
264                                };
265
266                                let mut send = String::new();
267                                write!(&mut send, "{}{}\n", CMD_SEND, encoded).unwrap();
268                                if let Ok(_) = stream.write_all(send.as_bytes()) {
269                                    let mut reader = BufReader::new(&stream);
270                                    let mut resp = String::new();
271                                    reader.read_line(&mut resp).unwrap();
272                                    tx.send(resp[..resp.len() - 1].as_bytes().to_vec()).unwrap();
273                                }
274
275                                break 'onetime;
276                            }
277                        }
278                    }
279                }
280            });
281        }
282
283        // Start our internal TCP server.
284        let tx_tcp = tx.clone();
285        let host = self.id.clone();
286        thread::spawn(move || {
287            info!("starting internal TCP server");
288            let listen = TcpListener::bind(host).unwrap();
289            for stream in listen.incoming() {
290                let stream = match stream {
291                    Ok(v) => v,
292                    Err(e) => {
293                        error!("stream failed: {e}");
294                        continue;
295                    }
296                };
297
298                tx_tcp.send(WorkerCtrl::TcpServer(stream)).unwrap();
299            }
300        });
301
302        // Start the member tracking and heartbeating thread.
303        let mut sync_ms = self.sync_ms;
304        if sync_ms == 0 {
305            sync_ms = lease_ms;
306        }
307
308        let tx_ensure = tx.clone();
309        let lock = self.lock[0].clone();
310        let leader_track = self.leader.clone();
311        let id_1 = self.id.clone();
312        let id_0 = self.id.clone();
313        let members = self.members.clone();
314        thread::spawn(move || {
315            loop {
316                let start = Instant::now();
317
318                defer! {
319                    let mut pause = sync_ms;
320                    let latency = start.elapsed().as_millis() as u64;
321                    if latency < sync_ms && (pause-latency) > 0 {
322                        pause -= latency;
323                    }
324
325                    info!("members took {:?}", start.elapsed());
326                    thread::sleep(Duration::from_millis(pause));
327                }
328
329                if leader_track.load(Ordering::Acquire) == 1 {
330                    // We are leader. Ensure liveness of all members.
331                    let mut mm: Vec<String> = Vec::new();
332
333                    {
334                        if let Ok(v) = members.clone().lock() {
335                            for (k, _) in &*v {
336                                if k != &id_1 {
337                                    mm.push(k.clone());
338                                }
339                            }
340                        }
341                    }
342
343                    for name in mm {
344                        tx_ensure.send(WorkerCtrl::PingMember(name)).unwrap();
345                    }
346
347                    {
348                        if let Ok(v) = members.clone().lock() {
349                            info!("{} member(s) tracked", v.len());
350                        }
351                    }
352                } else {
353                    // We're not leader. Send heartbeats to leader.
354                    let mut leader = String::new();
355
356                    {
357                        if let Ok(v) = lock.lock() {
358                            let (_, writer, _) = v.has_lock();
359                            write!(&mut leader, "{}", writer).unwrap();
360                        }
361                    }
362
363                    if leader.is_empty() {
364                        continue;
365                    }
366
367                    let hp: Vec<&str> = leader.split(":").collect();
368                    let hh: Vec<&str> = hp[0].split(".").collect();
369                    let leader_ip = SocketAddr::new(
370                        IpAddr::V4(Ipv4Addr::new(
371                            hh[0].parse::<u8>().unwrap(),
372                            hh[1].parse::<u8>().unwrap(),
373                            hh[2].parse::<u8>().unwrap(),
374                            hh[3].parse::<u8>().unwrap(),
375                        )),
376                        hp[1].parse::<u16>().unwrap(),
377                    );
378
379                    let mut stream = match TcpStream::connect_timeout(&leader_ip, Duration::from_secs(5)) {
380                        Ok(v) => v,
381                        Err(e) => {
382                            error!("connect_timeout failed: {e}");
383                            continue;
384                        }
385                    };
386
387                    let mut send = String::new();
388                    write!(&mut send, "{}{}\n", CMD_PING, id_0).unwrap();
389                    if let Ok(_) = stream.write_all(send.as_bytes()) {
390                        let mut reader = BufReader::new(&stream);
391                        let mut resp = String::new();
392                        reader.read_line(&mut resp).unwrap();
393
394                        info!("response: {resp:?}");
395
396                        if resp.chars().nth(0).unwrap() != '+' {
397                            continue;
398                        }
399
400                        let mm: Vec<&str> = resp[1..resp.len() - 1].split(",").collect();
401                        if mm.len() > 0 {
402                            if let Ok(mut v) = members.clone().lock() {
403                                v.clear();
404                                for m in mm {
405                                    if m.len() > 0 && !m.starts_with("+") {
406                                        v.insert(m.to_string(), 0);
407                                    }
408                                }
409                            }
410                        }
411
412                        {
413                            if let Ok(v) = members.clone().lock() {
414                                info!("{} member(s) tracked", v.len());
415                            }
416                        }
417                    }
418                }
419            }
420        });
421
422        // Finally, set the system active.
423        let active = self.active.clone();
424        active.store(1, Ordering::Relaxed);
425
426        Ok(())
427    }
428
429    /// Returns true if this instance got the lock, together with the name and lock token.
430    pub fn has_lock(&self) -> (bool, String, u64) {
431        let active = self.active.clone();
432        if active.load(Ordering::Acquire) == 0 {
433            return (false, String::from(""), 0);
434        }
435
436        let lock = self.lock[0].clone();
437        if let Ok(v) = lock.lock() {
438            return v.has_lock();
439        }
440
441        return (false, String::from(""), 0);
442    }
443
444    /// Returns a list of current members in the group/cluster.
445    pub fn members(&mut self) -> Vec<String> {
446        let mut ret: Vec<String> = Vec::new();
447        let active = self.active.clone();
448        if active.load(Ordering::Acquire) == 0 {
449            return ret;
450        }
451
452        if let Ok(v) = self.members.lock() {
453            for (k, _) in &*v {
454                ret.push(k.clone());
455            }
456        }
457
458        return ret;
459    }
460
461    /// TODO: Send to leader.
462    pub fn send(&mut self, msg: Vec<u8>) -> Result<Vec<u8>, Error> {
463        let active = self.active.clone();
464        if active.load(Ordering::Acquire) == 0 {
465            return Err(anyhow!("still initializing"));
466        }
467
468        let (tx, rx): (Sender<Vec<u8>>, Receiver<Vec<u8>>) = unbounded();
469        self.tx_worker[0].send(WorkerCtrl::ToLeader { msg, tx }).unwrap();
470        let r = rx.recv().unwrap();
471        match r[0] {
472            b'+' => return Ok(r[1..].to_vec()),
473            b'-' => return Err(anyhow!(String::from_utf8(r[1..].to_vec()).unwrap())),
474            _ => return Err(anyhow!("unknown")),
475        }
476    }
477
478    pub fn close(&mut self) {
479        let lock = self.lock[0].clone();
480        if let Ok(mut v) = lock.lock() {
481            v.close();
482        }
483    }
484}
485
486/// `LockBuilder` builds an instance of Lock with default values.
487#[derive(Default)]
488pub struct OpBuilder {
489    db: String,
490    table: String,
491    name: String,
492    id: String,
493    lease_ms: u64,
494    sync_ms: u64,
495    tx_toleader: Option<mpsc::Sender<Comms>>,
496    tx_broadcast: Option<mpsc::Sender<Comms>>,
497}
498
499impl OpBuilder {
500    pub fn new() -> OpBuilder {
501        OpBuilder::default()
502    }
503
504    /// Sets the internal lock's Spanner database URL.
505    pub fn db(mut self, db: String) -> OpBuilder {
506        self.db = db;
507        self
508    }
509
510    /// Sets the internal lock's Spanner table for backing storage.
511    pub fn table(mut self, table: String) -> OpBuilder {
512        self.table = table;
513        self
514    }
515
516    /// Sets the internal lock name.
517    pub fn name(mut self, name: String) -> OpBuilder {
518        self.name = name;
519        self
520    }
521
522    /// Sets this instance (or node) id. Format should be `host:port`.
523    pub fn id(mut self, id: String) -> OpBuilder {
524        self.id = id;
525        self
526    }
527
528    /// Sets the internal lock's leader lease timeout.
529    pub fn lease_ms(mut self, ms: u64) -> OpBuilder {
530        self.lease_ms = ms;
531        self
532    }
533
534    /// Sets the timeout for syncing member info across the group.
535    pub fn sync_ms(mut self, ms: u64) -> OpBuilder {
536        self.sync_ms = ms;
537        self
538    }
539
540    /// TODO:
541    pub fn tx_toleader(mut self, tx: Option<mpsc::Sender<Comms>>) -> OpBuilder {
542        self.tx_toleader = tx;
543        self
544    }
545
546    /// TODO:
547    pub fn tx_broadcast(mut self, tx: Option<mpsc::Sender<Comms>>) -> OpBuilder {
548        self.tx_broadcast = tx;
549        self
550    }
551
552    pub fn build(self) -> Op {
553        Op {
554            db: self.db,
555            table: self.table,
556            name: self.name,
557            id: if self.id != "" {
558                self.id
559            } else {
560                let id = Uuid::new_v4();
561                id.to_string()
562            },
563            lock: vec![],
564            leader: Arc::new(AtomicUsize::new(0)),
565            lease_ms: self.sync_ms,
566            sync_ms: self.sync_ms,
567            members: Arc::new(Mutex::new(HashMap::new())),
568            tx_worker: vec![],
569            tx_toleader: self.tx_toleader,
570            tx_broadcast: self.tx_broadcast,
571            active: Arc::new(AtomicUsize::new(0)),
572        }
573    }
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579
580    #[test]
581    fn no_run() {
582        let op = OpBuilder::new()
583            .db("projects/p/instances/i/databases/db".to_string())
584            .table("locktable".to_string())
585            .name("hedge-rs".to_string())
586            .id(":8080".to_string())
587            .lease_ms(3_000)
588            .build();
589
590        let (locked, _, token) = op.has_lock();
591        assert_eq!(locked, false);
592        assert_eq!(token, 0);
593    }
594}