1mod protocol;
2
3use anyhow::{Error, Result, anyhow};
4use base64ct::{Base64, Encoding};
5use crossbeam_channel::{Receiver, Sender, unbounded};
6use log::*;
7use protocol::*;
8use spindle_rs::*;
9use std::collections::HashMap;
10use std::fmt::Write as _;
11use std::io::{BufReader, prelude::*};
12use std::net::{IpAddr, Ipv4Addr, SocketAddr, TcpListener, TcpStream};
13use std::sync::atomic::{AtomicUsize, Ordering};
14use std::sync::mpsc;
15use std::sync::{Arc, Mutex};
16use std::thread;
17use std::time::{Duration, Instant};
18use uuid::Uuid;
19
20#[macro_use(defer)]
21extern crate scopeguard;
22
23#[derive(Debug)]
24pub enum Comms {
25 ToLeader { msg: Vec<u8>, tx: mpsc::Sender<Vec<u8>> },
26 Broadcast { msg: Vec<u8>, tx: mpsc::Sender<Vec<u8>> },
27}
28
29#[derive(Debug)]
30enum WorkerCtrl {
31 TcpServer(TcpStream),
32 PingMember(String),
33 ToLeader { msg: Vec<u8>, tx: Sender<Vec<u8>> },
34}
35
36pub struct Op {
37 db: String,
38 table: String,
39 name: String,
40 id: String,
41 lock: Vec<Arc<Mutex<Lock>>>,
42 leader: Arc<AtomicUsize>,
43 lease_ms: u64,
44 sync_ms: u64,
45 members: Arc<Mutex<HashMap<String, usize>>>,
46 tx_worker: Vec<Sender<WorkerCtrl>>,
47 tx_toleader: Option<mpsc::Sender<Comms>>,
48 tx_broadcast: Option<mpsc::Sender<Comms>>,
49 active: Arc<AtomicUsize>,
50}
51
52impl Op {
53 pub fn builder() -> OpBuilder {
55 OpBuilder::default()
56 }
57
58 pub fn run(&mut self) -> Result<()> {
60 {
61 let members = self.members.clone();
62 let id = self.id.clone();
63 if let Ok(mut v) = members.lock() {
64 v.insert(id, 0);
65 }
66 }
67
68 let mut lock_name = String::new();
69 write!(&mut lock_name, "hedge/spindle/{}", self.name.clone()).unwrap();
70 let mut lease_ms = self.lease_ms;
71 if lease_ms == 0 {
72 lease_ms = 3_000;
73 }
74
75 let (tx_ldr, rx_ldr) = mpsc::channel();
76 self.lock = vec![Arc::new(Mutex::new(
77 LockBuilder::new()
78 .db(self.db.clone())
79 .table(self.table.clone())
80 .name(lock_name)
81 .id(self.id.clone())
82 .lease_ms(lease_ms)
83 .leader_tx(Some(tx_ldr))
84 .build(),
85 ))];
86
87 {
88 let lc = self.lock[0].clone();
89 if let Ok(mut v) = lc.lock() {
90 v.run()?;
91 }
92 }
93
94 let leader_setter = self.leader.clone();
96 thread::spawn(move || {
97 loop {
98 let ldr = rx_ldr.recv();
99 match ldr {
100 Ok(v) => leader_setter.store(v, Ordering::Relaxed),
101 Err(_) => {}
102 }
103 }
104 });
105
106 let (tx, rx): (Sender<WorkerCtrl>, Receiver<WorkerCtrl>) = unbounded();
107 let rxs: Arc<Mutex<HashMap<usize, Receiver<WorkerCtrl>>>> = Arc::new(Mutex::new(HashMap::new()));
108 let cpus = num_cpus::get();
109
110 self.tx_worker = vec![tx.clone()];
111
112 for i in 0..cpus {
113 let recv = rxs.clone();
114
115 {
116 let mut rv = recv.lock().unwrap();
117 rv.insert(i, rx.clone());
118 }
119 }
120
121 for i in 0..cpus {
123 let lock = self.lock[0].clone();
124 let recv = rxs.clone();
125 let members = self.members.clone();
126 let leader = self.leader.clone();
127 let toleader = match self.tx_toleader.clone() {
128 Some(v) => vec![v.clone()],
129 None => vec![],
130 };
131
132 thread::spawn(move || {
133 loop {
134 let mut rx: Option<Receiver<WorkerCtrl>> = None;
135
136 {
137 let rxval = match recv.lock() {
138 Ok(v) => v,
139 Err(e) => {
140 error!("T{i}: lock failed: {e}");
141 break;
142 }
143 };
144
145 if let Some(v) = rxval.get(&i) {
146 rx = Some(v.clone());
147 }
148 }
149
150 match rx.unwrap().recv().unwrap() {
151 WorkerCtrl::TcpServer(stream) => {
152 let start = Instant::now();
153
154 defer! {
155 info!("[T{i}]: tcp took {:?}", start.elapsed());
156 }
157
158 handle_protocol(
159 i,
160 stream,
161 leader.load(Ordering::Acquire),
162 members.clone(),
163 toleader.clone(),
164 );
165 }
166 WorkerCtrl::PingMember(name) => {
167 let mut delete = false;
168 let start = Instant::now();
169
170 defer! {
171 info!("[T{i}]: ping took {:?}", start.elapsed());
172 }
173
174 'onetime: loop {
175 let hp: Vec<&str> = name.split(":").collect();
176 let hh: Vec<&str> = hp[0].split(".").collect();
177 let ip = SocketAddr::new(
178 IpAddr::V4(Ipv4Addr::new(
179 hh[0].parse::<u8>().unwrap(),
180 hh[1].parse::<u8>().unwrap(),
181 hh[2].parse::<u8>().unwrap(),
182 hh[3].parse::<u8>().unwrap(),
183 )),
184 hp[1].parse::<u16>().unwrap(),
185 );
186
187 let mut stream = match TcpStream::connect_timeout(&ip, Duration::from_secs(5)) {
188 Ok(v) => v,
189 Err(e) => {
190 error!("connect_timeout to {name} failed: {e}");
191 delete = true;
192 break 'onetime;
193 }
194 };
195
196 let mut send = String::new();
197 write!(&mut send, "{}\n", CMD_PING).unwrap();
198 if let Err(_) = stream.write_all(send.as_bytes()) {
199 break 'onetime;
200 }
201
202 let mut reader = BufReader::new(&stream);
203 let mut resp = String::new();
204 reader.read_line(&mut resp).unwrap();
205
206 if !resp.starts_with("+1") {
207 delete = true
208 }
209
210 break 'onetime;
211 }
212
213 if delete {
214 let members = members.clone();
215 if let Ok(mut v) = members.lock() {
216 v.remove(&name);
217 }
218 }
219 }
220 WorkerCtrl::ToLeader { msg, tx } => {
221 let start = Instant::now();
222
223 defer! {
224 info!("[T{i}]: toleader took {:?}", start.elapsed());
225 }
226
227 'onetime: loop {
228 let mut leader = String::new();
229
230 {
231 if let Ok(v) = lock.lock() {
232 let (_, writer, _) = v.has_lock();
233 write!(&mut leader, "{}", writer).unwrap();
234 }
235 }
236
237 if leader.is_empty() {
238 tx.send("-no leader".as_bytes().to_vec()).unwrap();
239 break 'onetime;
240 }
241
242 let encoded = Base64::encode_string(&msg);
243
244 let hp: Vec<&str> = leader.split(":").collect();
245 let hh: Vec<&str> = hp[0].split(".").collect();
246 let leader_ip = SocketAddr::new(
247 IpAddr::V4(Ipv4Addr::new(
248 hh[0].parse::<u8>().unwrap(),
249 hh[1].parse::<u8>().unwrap(),
250 hh[2].parse::<u8>().unwrap(),
251 hh[3].parse::<u8>().unwrap(),
252 )),
253 hp[1].parse::<u16>().unwrap(),
254 );
255
256 let mut stream = match TcpStream::connect_timeout(&leader_ip, Duration::from_secs(5)) {
257 Ok(v) => v,
258 Err(e) => {
259 let mut err = String::new();
260 write!(&mut err, "-connect_timeout failed: {e}").unwrap();
261 tx.send(err.as_bytes().to_vec()).unwrap();
262 break 'onetime;
263 }
264 };
265
266 let mut send = String::new();
267 write!(&mut send, "{}{}\n", CMD_SEND, encoded).unwrap();
268 if let Ok(_) = stream.write_all(send.as_bytes()) {
269 let mut reader = BufReader::new(&stream);
270 let mut resp = String::new();
271 reader.read_line(&mut resp).unwrap();
272 tx.send(resp[..resp.len() - 1].as_bytes().to_vec()).unwrap();
273 }
274
275 break 'onetime;
276 }
277 }
278 }
279 }
280 });
281 }
282
283 let tx_tcp = tx.clone();
285 let host = self.id.clone();
286 thread::spawn(move || {
287 info!("starting internal TCP server");
288 let listen = TcpListener::bind(host).unwrap();
289 for stream in listen.incoming() {
290 let stream = match stream {
291 Ok(v) => v,
292 Err(e) => {
293 error!("stream failed: {e}");
294 continue;
295 }
296 };
297
298 tx_tcp.send(WorkerCtrl::TcpServer(stream)).unwrap();
299 }
300 });
301
302 let mut sync_ms = self.sync_ms;
304 if sync_ms == 0 {
305 sync_ms = lease_ms;
306 }
307
308 let tx_ensure = tx.clone();
309 let lock = self.lock[0].clone();
310 let leader_track = self.leader.clone();
311 let id_1 = self.id.clone();
312 let id_0 = self.id.clone();
313 let members = self.members.clone();
314 thread::spawn(move || {
315 loop {
316 let start = Instant::now();
317
318 defer! {
319 let mut pause = sync_ms;
320 let latency = start.elapsed().as_millis() as u64;
321 if latency < sync_ms && (pause-latency) > 0 {
322 pause -= latency;
323 }
324
325 info!("members took {:?}", start.elapsed());
326 thread::sleep(Duration::from_millis(pause));
327 }
328
329 if leader_track.load(Ordering::Acquire) == 1 {
330 let mut mm: Vec<String> = Vec::new();
332
333 {
334 if let Ok(v) = members.clone().lock() {
335 for (k, _) in &*v {
336 if k != &id_1 {
337 mm.push(k.clone());
338 }
339 }
340 }
341 }
342
343 for name in mm {
344 tx_ensure.send(WorkerCtrl::PingMember(name)).unwrap();
345 }
346
347 {
348 if let Ok(v) = members.clone().lock() {
349 info!("{} member(s) tracked", v.len());
350 }
351 }
352 } else {
353 let mut leader = String::new();
355
356 {
357 if let Ok(v) = lock.lock() {
358 let (_, writer, _) = v.has_lock();
359 write!(&mut leader, "{}", writer).unwrap();
360 }
361 }
362
363 if leader.is_empty() {
364 continue;
365 }
366
367 let hp: Vec<&str> = leader.split(":").collect();
368 let hh: Vec<&str> = hp[0].split(".").collect();
369 let leader_ip = SocketAddr::new(
370 IpAddr::V4(Ipv4Addr::new(
371 hh[0].parse::<u8>().unwrap(),
372 hh[1].parse::<u8>().unwrap(),
373 hh[2].parse::<u8>().unwrap(),
374 hh[3].parse::<u8>().unwrap(),
375 )),
376 hp[1].parse::<u16>().unwrap(),
377 );
378
379 let mut stream = match TcpStream::connect_timeout(&leader_ip, Duration::from_secs(5)) {
380 Ok(v) => v,
381 Err(e) => {
382 error!("connect_timeout failed: {e}");
383 continue;
384 }
385 };
386
387 let mut send = String::new();
388 write!(&mut send, "{}{}\n", CMD_PING, id_0).unwrap();
389 if let Ok(_) = stream.write_all(send.as_bytes()) {
390 let mut reader = BufReader::new(&stream);
391 let mut resp = String::new();
392 reader.read_line(&mut resp).unwrap();
393
394 info!("response: {resp:?}");
395
396 if resp.chars().nth(0).unwrap() != '+' {
397 continue;
398 }
399
400 let mm: Vec<&str> = resp[1..resp.len() - 1].split(",").collect();
401 if mm.len() > 0 {
402 if let Ok(mut v) = members.clone().lock() {
403 v.clear();
404 for m in mm {
405 if m.len() > 0 && !m.starts_with("+") {
406 v.insert(m.to_string(), 0);
407 }
408 }
409 }
410 }
411
412 {
413 if let Ok(v) = members.clone().lock() {
414 info!("{} member(s) tracked", v.len());
415 }
416 }
417 }
418 }
419 }
420 });
421
422 let active = self.active.clone();
424 active.store(1, Ordering::Relaxed);
425
426 Ok(())
427 }
428
429 pub fn has_lock(&self) -> (bool, String, u64) {
431 let active = self.active.clone();
432 if active.load(Ordering::Acquire) == 0 {
433 return (false, String::from(""), 0);
434 }
435
436 let lock = self.lock[0].clone();
437 if let Ok(v) = lock.lock() {
438 return v.has_lock();
439 }
440
441 return (false, String::from(""), 0);
442 }
443
444 pub fn members(&mut self) -> Vec<String> {
446 let mut ret: Vec<String> = Vec::new();
447 let active = self.active.clone();
448 if active.load(Ordering::Acquire) == 0 {
449 return ret;
450 }
451
452 if let Ok(v) = self.members.lock() {
453 for (k, _) in &*v {
454 ret.push(k.clone());
455 }
456 }
457
458 return ret;
459 }
460
461 pub fn send(&mut self, msg: Vec<u8>) -> Result<Vec<u8>, Error> {
463 let active = self.active.clone();
464 if active.load(Ordering::Acquire) == 0 {
465 return Err(anyhow!("still initializing"));
466 }
467
468 let (tx, rx): (Sender<Vec<u8>>, Receiver<Vec<u8>>) = unbounded();
469 self.tx_worker[0].send(WorkerCtrl::ToLeader { msg, tx }).unwrap();
470 let r = rx.recv().unwrap();
471 match r[0] {
472 b'+' => return Ok(r[1..].to_vec()),
473 b'-' => return Err(anyhow!(String::from_utf8(r[1..].to_vec()).unwrap())),
474 _ => return Err(anyhow!("unknown")),
475 }
476 }
477
478 pub fn close(&mut self) {
479 let lock = self.lock[0].clone();
480 if let Ok(mut v) = lock.lock() {
481 v.close();
482 }
483 }
484}
485
486#[derive(Default)]
488pub struct OpBuilder {
489 db: String,
490 table: String,
491 name: String,
492 id: String,
493 lease_ms: u64,
494 sync_ms: u64,
495 tx_toleader: Option<mpsc::Sender<Comms>>,
496 tx_broadcast: Option<mpsc::Sender<Comms>>,
497}
498
499impl OpBuilder {
500 pub fn new() -> OpBuilder {
501 OpBuilder::default()
502 }
503
504 pub fn db(mut self, db: String) -> OpBuilder {
506 self.db = db;
507 self
508 }
509
510 pub fn table(mut self, table: String) -> OpBuilder {
512 self.table = table;
513 self
514 }
515
516 pub fn name(mut self, name: String) -> OpBuilder {
518 self.name = name;
519 self
520 }
521
522 pub fn id(mut self, id: String) -> OpBuilder {
524 self.id = id;
525 self
526 }
527
528 pub fn lease_ms(mut self, ms: u64) -> OpBuilder {
530 self.lease_ms = ms;
531 self
532 }
533
534 pub fn sync_ms(mut self, ms: u64) -> OpBuilder {
536 self.sync_ms = ms;
537 self
538 }
539
540 pub fn tx_toleader(mut self, tx: Option<mpsc::Sender<Comms>>) -> OpBuilder {
542 self.tx_toleader = tx;
543 self
544 }
545
546 pub fn tx_broadcast(mut self, tx: Option<mpsc::Sender<Comms>>) -> OpBuilder {
548 self.tx_broadcast = tx;
549 self
550 }
551
552 pub fn build(self) -> Op {
553 Op {
554 db: self.db,
555 table: self.table,
556 name: self.name,
557 id: if self.id != "" {
558 self.id
559 } else {
560 let id = Uuid::new_v4();
561 id.to_string()
562 },
563 lock: vec![],
564 leader: Arc::new(AtomicUsize::new(0)),
565 lease_ms: self.sync_ms,
566 sync_ms: self.sync_ms,
567 members: Arc::new(Mutex::new(HashMap::new())),
568 tx_worker: vec![],
569 tx_toleader: self.tx_toleader,
570 tx_broadcast: self.tx_broadcast,
571 active: Arc::new(AtomicUsize::new(0)),
572 }
573 }
574}
575
576#[cfg(test)]
577mod tests {
578 use super::*;
579
580 #[test]
581 fn no_run() {
582 let op = OpBuilder::new()
583 .db("projects/p/instances/i/databases/db".to_string())
584 .table("locktable".to_string())
585 .name("hedge-rs".to_string())
586 .id(":8080".to_string())
587 .lease_ms(3_000)
588 .build();
589
590 let (locked, _, token) = op.has_lock();
591 assert_eq!(locked, false);
592 assert_eq!(token, 0);
593 }
594}