1mod auth;
29mod config;
30mod leader_follower;
31pub mod license;
32mod persistence;
33mod server;
34mod stats;
35pub mod store;
36mod subscribers;
37mod telemetry;
38mod worterbuch;
39
40use crate::stats::track_stats;
41pub use crate::worterbuch::*;
42pub use config::*;
43use leader_follower::{
44 ClientWriteCommand, LeaderSyncMessage, Mode, StateSync, initial_sync, process_leader_message,
45 run_cluster_sync_port, shutdown_on_stdin_close,
46};
47use miette::{IntoDiagnostic, Result, miette};
48use persistence::PERSISTENCE_LOCKED;
49use serde_json::{Value, json};
50use server::common::{CloneableWbApi, WbFunction};
51use std::{error::Error, sync::atomic::Ordering};
52use store::ValueEntry;
53use tokio::{
54 io::{AsyncBufReadExt, BufReader},
55 net::TcpStream,
56 select,
57 sync::{mpsc, oneshot},
58};
59use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
60use tracing::{Instrument, Level, debug, error, info, span};
61use uuid::Uuid;
62use worterbuch_common::{
63 KeySegment, PStateEvent, ProtocolVersion, SYSTEM_TOPIC_CLIENTS, SYSTEM_TOPIC_GRAVE_GOODS,
64 SYSTEM_TOPIC_LAST_WILL, SYSTEM_TOPIC_MODE, SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_ROOT_PREFIX,
65 SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSION, error::WorterbuchError, receive_msg, topic,
66};
67
68pub const SUPPORTED_PROTOCOL_VERSIONS: [ProtocolVersion; 2] =
69 [ProtocolVersion::new(0, 11), ProtocolVersion::new(1, 1)];
70
71pub const INTERNAL_CLIENT_ID: Uuid = Uuid::nil();
72
73type ServerSubsystem = tokio_graceful_shutdown::NestedSubsystem<Box<dyn Error + Send + Sync>>;
74
75pub async fn run_worterbuch(subsys: SubsystemHandle, config: Config) -> Result<()> {
76 let config_pers = config.clone();
77
78 let channel_buffer_size = config.channel_buffer_size;
79
80 let use_persistence = config.use_persistence;
81
82 let mut worterbuch = if use_persistence && !config.follower {
83 persistence::load(config.clone()).await?
84 } else {
85 Worterbuch::with_config(config.clone())
86 };
87
88 let (api_tx, api_rx) = mpsc::channel(channel_buffer_size);
89 let api = CloneableWbApi::new(api_tx);
90
91 let web_server = if let Some(WsEndpoint {
92 endpoint: Endpoint {
93 tls,
94 bind_addr,
95 port,
96 },
97 public_addr,
98 }) = &config.ws_endpoint
99 {
100 let sapi = api.clone();
101 let tls = tls.to_owned();
102 let bind_addr = bind_addr.to_owned();
103 let port = port.to_owned();
104 let public_addr = public_addr.to_owned();
105 let ws_enabled = !config.follower;
106 Some(
107 subsys.start(SubsystemBuilder::new("webserver", move |subsys| {
108 server::poem::start(sapi, tls, bind_addr, port, public_addr, subsys, ws_enabled)
109 })),
110 )
111 } else {
112 None
113 };
114
115 if config.follower {
116 run_in_follower_mode(subsys, &mut worterbuch, api_rx, config, web_server).await?;
117 } else {
118 worterbuch
119 .set(
120 topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSION),
121 serde_json::to_value(SUPPORTED_PROTOCOL_VERSIONS)
122 .unwrap_or_else(|e| Value::String(format!("Error serializing version: {e}"))),
123 INTERNAL_CLIENT_ID,
124 )
125 .await?;
126
127 if use_persistence {
128 let worterbuch_pers = api.clone();
129 subsys.start(SubsystemBuilder::new("persistence", |subsys| {
130 persistence::periodic(worterbuch_pers, config_pers, subsys)
131 }));
132 }
133
134 let worterbuch_uptime = api.clone();
135 subsys.start(SubsystemBuilder::new("stats", |subsys| {
136 track_stats(worterbuch_uptime, subsys)
137 }));
138
139 let tcp_server = if let Some(Endpoint {
140 tls: _,
141 bind_addr,
142 port,
143 }) = &config.tcp_endpoint
144 {
145 let sapi = api.clone();
146 let bind_addr = bind_addr.to_owned();
147 let port = port.to_owned();
148 Some(
149 subsys.start(SubsystemBuilder::new("tcpserver", move |subsys| {
150 server::tcp::start(sapi, bind_addr, port, subsys)
151 })),
152 )
153 } else {
154 None
155 };
156
157 #[cfg(target_family = "unix")]
158 let unix_socket = if let Some(UnixEndpoint { path }) = &config.unix_endpoint {
159 let sapi = api.clone();
160 let path = path.clone();
161 Some(
162 subsys.start(SubsystemBuilder::new("unixsocket", move |subsys| {
163 server::unix::start(sapi, path, subsys)
164 })),
165 )
166 } else {
167 None
168 };
169
170 #[cfg(not(target_family = "unix"))]
171 let unix_socket = None;
172
173 if config.leader {
174 run_in_leader_mode(
175 subsys,
176 worterbuch,
177 api_rx,
178 config,
179 web_server,
180 tcp_server,
181 unix_socket,
182 )
183 .await?;
184 } else {
185 run_in_regular_mode(
186 subsys,
187 worterbuch,
188 api_rx,
189 config,
190 web_server,
191 tcp_server,
192 unix_socket,
193 )
194 .await?;
195 }
196 }
197
198 debug!("worterbuch subsystem completed.");
199
200 Ok(())
201}
202
203async fn process_api_call(worterbuch: &mut Worterbuch, function: WbFunction) {
204 match function {
205 WbFunction::Get(key, tx) => {
206 tx.send(worterbuch.get(&key)).ok();
207 }
208 WbFunction::CGet(key, tx) => {
209 tx.send(worterbuch.cget(&key)).ok();
210 }
211 WbFunction::Set(key, value, client_id, tx, span) => {
212 tx.send(worterbuch.set(key, value, client_id).instrument(span).await)
213 .ok();
214 }
215 WbFunction::CSet(key, value, version, client_id, tx) => {
216 tx.send(worterbuch.cset(key, value, version, client_id).await)
217 .ok();
218 }
219 WbFunction::SPubInit(transaction_id, key, client_id, tx) => {
220 tx.send(worterbuch.spub_init(transaction_id, key, client_id).await)
221 .ok();
222 }
223 WbFunction::SPub(transaction_id, value, client_id, tx) => {
224 tx.send(worterbuch.spub(transaction_id, value, client_id).await)
225 .ok();
226 }
227 WbFunction::Publish(key, value, tx) => {
228 tx.send(worterbuch.publish(key, value).await).ok();
229 }
230 WbFunction::Ls(parent, tx) => {
231 tx.send(worterbuch.ls(&parent)).ok();
232 }
233 WbFunction::PLs(parent, tx) => {
234 tx.send(worterbuch.pls(&parent)).ok();
235 }
236 WbFunction::PGet(pattern, tx) => {
237 tx.send(worterbuch.pget(&pattern)).ok();
238 }
239 WbFunction::Subscribe(client_id, transaction_id, key, unique, live_only, tx) => {
240 tx.send(
241 worterbuch
242 .subscribe(client_id, transaction_id, key, unique, live_only)
243 .await,
244 )
245 .ok();
246 }
247 WbFunction::PSubscribe(client_id, transaction_id, pattern, unique, live_only, tx) => {
248 tx.send(
249 worterbuch
250 .psubscribe(client_id, transaction_id, pattern, unique, live_only)
251 .await,
252 )
253 .ok();
254 }
255 WbFunction::SubscribeLs(client_id, transaction_id, parent, tx) => {
256 tx.send(
257 worterbuch
258 .subscribe_ls(client_id, transaction_id, parent)
259 .await,
260 )
261 .ok();
262 }
263 WbFunction::Unsubscribe(client_id, transaction_id, tx) => {
264 tx.send(worterbuch.unsubscribe(client_id, transaction_id).await)
265 .ok();
266 }
267 WbFunction::UnsubscribeLs(client_id, transaction_id, tx) => {
268 tx.send(worterbuch.unsubscribe_ls(client_id, transaction_id))
269 .ok();
270 }
271 WbFunction::Delete(key, client_id, tx) => {
272 tx.send(worterbuch.delete(key, client_id).await).ok();
273 }
274 WbFunction::PDelete(pattern, client_id, tx) => {
275 tx.send(worterbuch.pdelete(pattern, client_id).await).ok();
276 }
277 WbFunction::Lock(key, client_id, tx) => {
278 tx.send(worterbuch.lock(key, client_id)).ok();
279 }
280 WbFunction::AcquireLock(key, client_id, tx) => {
281 tx.send(worterbuch.acquire_lock(key, client_id).await).ok();
282 }
283 WbFunction::ReleaseLock(key, client_id, tx) => {
284 tx.send(worterbuch.release_lock(key, client_id).await).ok();
285 }
286 WbFunction::Connected(client_id, remote_addr, protocol) => {
287 worterbuch
288 .connected(client_id, remote_addr, &protocol)
289 .await;
290 }
291 WbFunction::Disconnected(client_id, remote_addr) => {
292 worterbuch.disconnected(client_id, remote_addr).await.ok();
293 }
294 WbFunction::Config(tx) => {
295 tx.send(worterbuch.config().clone()).ok();
296 }
297 WbFunction::Export(tx, span) => {
298 let g = span.enter();
299 worterbuch.export_for_persistence(tx);
300 drop(g);
301 drop(span);
302 }
303 WbFunction::Import(json, tx) => {
304 tx.send(worterbuch.import(&json).await).ok();
305 }
306 WbFunction::Len(tx) => {
307 tx.send(worterbuch.len()).ok();
308 }
309 }
310}
311
312async fn process_api_call_as_follower(worterbuch: &mut Worterbuch, function: WbFunction) {
313 match function {
314 WbFunction::Get(key, tx) => {
315 tx.send(worterbuch.get(&key)).ok();
316 }
317 WbFunction::CGet(key, tx) => {
318 tx.send(worterbuch.cget(&key)).ok();
319 }
320 WbFunction::Set(_, _, _, tx, _) => {
321 tx.send(Err(WorterbuchError::NotLeader)).ok();
322 }
323 WbFunction::CSet(_, _, _, _, tx) => {
324 tx.send(Err(WorterbuchError::NotLeader)).ok();
325 }
326 WbFunction::SPubInit(_, _, _, tx) => {
327 tx.send(Err(WorterbuchError::NotLeader)).ok();
328 }
329 WbFunction::SPub(_, _, _, tx) => {
330 tx.send(Err(WorterbuchError::NotLeader)).ok();
331 }
332 WbFunction::Publish(_, _, tx) => {
333 tx.send(Err(WorterbuchError::NotLeader)).ok();
334 }
335 WbFunction::Ls(parent, tx) => {
336 tx.send(worterbuch.ls(&parent)).ok();
337 }
338 WbFunction::PLs(parent, tx) => {
339 tx.send(worterbuch.pls(&parent)).ok();
340 }
341 WbFunction::PGet(pattern, tx) => {
342 tx.send(worterbuch.pget(&pattern)).ok();
343 }
344 WbFunction::Subscribe(client_id, transaction_id, key, unique, live_only, tx) => {
345 tx.send(
346 worterbuch
347 .subscribe(client_id, transaction_id, key, unique, live_only)
348 .await,
349 )
350 .ok();
351 }
352 WbFunction::PSubscribe(client_id, transaction_id, pattern, unique, live_only, tx) => {
353 tx.send(
354 worterbuch
355 .psubscribe(client_id, transaction_id, pattern, unique, live_only)
356 .await,
357 )
358 .ok();
359 }
360 WbFunction::SubscribeLs(client_id, transaction_id, parent, tx) => {
361 tx.send(
362 worterbuch
363 .subscribe_ls(client_id, transaction_id, parent)
364 .await,
365 )
366 .ok();
367 }
368 WbFunction::Unsubscribe(client_id, transaction_id, tx) => {
369 tx.send(worterbuch.unsubscribe(client_id, transaction_id).await)
370 .ok();
371 }
372 WbFunction::UnsubscribeLs(client_id, transaction_id, tx) => {
373 tx.send(worterbuch.unsubscribe_ls(client_id, transaction_id))
374 .ok();
375 }
376 WbFunction::Lock(_, _, tx) => {
377 tx.send(Err(WorterbuchError::NotLeader)).ok();
378 }
379 WbFunction::AcquireLock(_, _, tx) => {
380 tx.send(Err(WorterbuchError::NotLeader)).ok();
381 }
382 WbFunction::ReleaseLock(_, _, tx) => {
383 tx.send(Err(WorterbuchError::NotLeader)).ok();
384 }
385 WbFunction::Delete(_, _, tx) => {
386 tx.send(Err(WorterbuchError::NotLeader)).ok();
387 }
388 WbFunction::PDelete(_, _, tx) => {
389 tx.send(Err(WorterbuchError::NotLeader)).ok();
390 }
391 WbFunction::Connected(client_id, remote_addr, protocol) => {
392 worterbuch
393 .connected(client_id, remote_addr, &protocol)
394 .await;
395 }
396 WbFunction::Disconnected(client_id, remote_addr) => {
397 worterbuch.disconnected(client_id, remote_addr).await.ok();
398 }
399 WbFunction::Config(tx) => {
400 tx.send(worterbuch.config().clone()).ok();
401 }
402 WbFunction::Export(tx, span) => {
403 let _ = span.enter();
404 worterbuch.export_for_persistence(tx);
405 }
406 WbFunction::Import(_, tx) => {
407 tx.send(Err(WorterbuchError::NotLeader)).ok();
408 }
409 WbFunction::Len(tx) => {
410 tx.send(worterbuch.len()).ok();
411 }
412 }
413}
414
415async fn run_in_regular_mode(
416 subsys: SubsystemHandle,
417 mut worterbuch: Worterbuch,
418 mut api_rx: mpsc::Receiver<WbFunction>,
419 config: Config,
420 web_server: Option<ServerSubsystem>,
421 tcp_server: Option<ServerSubsystem>,
422 unix_socket: Option<ServerSubsystem>,
423) -> Result<()> {
424 loop {
425 select! {
426 recv = api_rx.recv() => match recv {
427 Some(function) => process_api_call(&mut worterbuch, function).await,
428 None => break,
429 },
430 _ = subsys.on_shutdown_requested() => break,
431 }
432 }
433
434 shutdown(
435 subsys,
436 &mut worterbuch,
437 config,
438 web_server,
439 tcp_server,
440 unix_socket,
441 )
442 .await
443}
444
445async fn run_in_leader_mode(
446 subsys: SubsystemHandle,
447 mut worterbuch: Worterbuch,
448 mut api_rx: mpsc::Receiver<WbFunction>,
449 config: Config,
450 web_server: Option<ServerSubsystem>,
451 tcp_server: Option<ServerSubsystem>,
452 unix_socket: Option<ServerSubsystem>,
453) -> Result<()> {
454 info!("Running in LEADER mode.");
455
456 shutdown_on_stdin_close(&subsys);
457
458 worterbuch
459 .set(
460 topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_MODE),
461 json!(Mode::Leader),
462 INTERNAL_CLIENT_ID,
463 )
464 .await?;
465
466 let mut client_write_txs: Vec<(usize, mpsc::Sender<ClientWriteCommand>)> = vec![];
467 let (follower_connected_tx, mut follower_connected_rx) = mpsc::channel::<
468 oneshot::Sender<(StateSync, mpsc::Receiver<ClientWriteCommand>)>,
469 >(config.channel_buffer_size);
470
471 let mut tx_id = 0;
472 let mut dead = vec![];
473
474 let cfg = config.clone();
475 subsys.start(SubsystemBuilder::new("cluster_sync_port", move |s| {
476 run_cluster_sync_port(s, cfg, follower_connected_tx)
477 }));
478
479 let (mut grave_goods_rx, _) = worterbuch
480 .psubscribe(
481 INTERNAL_CLIENT_ID,
482 0,
483 topic!(
484 SYSTEM_TOPIC_ROOT,
485 SYSTEM_TOPIC_CLIENTS,
486 KeySegment::Wildcard,
487 SYSTEM_TOPIC_GRAVE_GOODS
488 ),
489 true,
490 false,
491 )
492 .await?;
493 let (mut last_will_rx, _) = worterbuch
494 .psubscribe(
495 INTERNAL_CLIENT_ID,
496 0,
497 topic!(
498 SYSTEM_TOPIC_ROOT,
499 SYSTEM_TOPIC_CLIENTS,
500 KeySegment::Wildcard,
501 SYSTEM_TOPIC_LAST_WILL
502 ),
503 true,
504 false,
505 )
506 .await?;
507
508 loop {
509 select! {
510 recv = grave_goods_rx.recv() => if let Some(e) = recv {
511 debug!("Forwarding grave goods change: {e:?}");
512 match e {
513 PStateEvent::KeyValuePairs(kvps) => {
514 for kvp in kvps {
515 let span = span!(Level::DEBUG, "forward_grave_goods");
516 forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Set(kvp.key, kvp.value, INTERNAL_CLIENT_ID, oneshot::channel().0, span), false).await;
517 }
518 },
519 PStateEvent::Deleted(kvps) => {
520 for kvp in kvps {
521 forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Delete(kvp.key, INTERNAL_CLIENT_ID, oneshot::channel().0), false).await;
522 }
523 },
524 }
525 },
526 recv = last_will_rx.recv() => if let Some(e) = recv {
527 debug!("Forwarding last will change: {e:?}");
528 match e {
529 PStateEvent::KeyValuePairs(kvps) => {
530 for kvp in kvps {
531 let span = span!(Level::DEBUG, "forward_last_will");
532 forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Set(kvp.key, kvp.value, INTERNAL_CLIENT_ID, oneshot::channel().0, span), false).await;
533 }
534 },
535 PStateEvent::Deleted(kvps) => {
536 for kvp in kvps {
537 forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Delete(kvp.key, INTERNAL_CLIENT_ID, oneshot::channel().0), false).await;
538 }
539 },
540 }
541 },
542 recv = api_rx.recv() => match recv {
543 Some(WbFunction::Import(json, tx)) => {
544 let (tx_int, rx_int) = oneshot::channel();
545 process_api_call(&mut worterbuch, WbFunction::Import(json, tx_int)).await;
546 let imported_values = rx_int.await.into_diagnostic()??;
547
548 for (key, (value, changed)) in &imported_values {
549 if *changed {
550 let cmd = match value.to_owned() {
551 ValueEntry::Cas(value, version) => ClientWriteCommand::CSet(key.to_owned(), value, version),
552 ValueEntry::Plain(value) => ClientWriteCommand::Set(key.to_owned(), value),
553 };
554 forward_to_followers(cmd, &mut client_write_txs, &mut dead).await;
555 }
556 }
557 tx.send(Ok(imported_values)).ok();
558 },
559 Some(function) => {
560 forward_api_call(&mut client_write_txs, &mut dead, &function, true).await;
561 process_api_call(&mut worterbuch, function).await;
562 },
563 None => break,
564 },
565 recv = follower_connected_rx.recv() => match recv {
566 Some(state_tx) => {
567 let (client_write_tx, client_write_rx) = mpsc::channel(config.channel_buffer_size);
568 let (current_state, grave_goods, last_will) = worterbuch.export();
569 if state_tx.send((StateSync(current_state, grave_goods, last_will), client_write_rx)).is_ok() {
570 client_write_txs.push((tx_id, client_write_tx));
571 tx_id += 1;
572 }
573 },
574 None => break,
575 },
576 _ = subsys.on_shutdown_requested() => break,
577 }
578 }
579
580 shutdown(
581 subsys,
582 &mut worterbuch,
583 config,
584 web_server,
585 tcp_server,
586 unix_socket,
587 )
588 .await
589}
590
591async fn run_in_follower_mode(
592 subsys: SubsystemHandle,
593 worterbuch: &mut Worterbuch,
594 mut api_rx: mpsc::Receiver<WbFunction>,
595 config: Config,
596 web_server: Option<ServerSubsystem>,
597) -> Result<()> {
598 let leader_addr = if let Some(it) = &config.leader_address {
599 it
600 } else {
601 return Err(miette!("No valid leader address configured."));
602 };
603 info!("Running in FOLLOWER mode. Leader: {}", leader_addr,);
604
605 shutdown_on_stdin_close(&subsys);
606
607 worterbuch
608 .set(
609 topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_MODE),
610 json!(Mode::Follower),
611 INTERNAL_CLIENT_ID,
612 )
613 .await?;
614
615 let mut persistence_interval = config.persistence_interval();
616
617 let stream = TcpStream::connect(leader_addr).await.into_diagnostic()?;
618
619 let mut lines = BufReader::new(stream).lines();
620
621 info!("Waiting for initial sync message from leader …");
622 select! {
623 recv = receive_msg(&mut lines) => match recv {
624 Ok(Some(msg)) => {
625 if let LeaderSyncMessage::Init(state) = msg {
626 initial_sync(state, worterbuch).await?;
627 PERSISTENCE_LOCKED.store(false, Ordering::Release);
628 persistence_interval.reset();
629 persistence::synchronous(worterbuch, &config).await?;
630 } else {
631 return Err(miette!("first message from leader is supposed to be the initial sync, but it wasn't"));
632 }
633 },
634 Ok(None) => return Err(miette!("connection to leader closed before initial sync")),
635 Err(e) => {
636 return Err(miette!("error receiving update from leader: {e}"));
637 }
638 },
639 _ = subsys.on_shutdown_requested() => return Err(miette!("shut down before initial sync")),
640 }
641 info!("Successfully synced with leader.");
642
643 loop {
644 select! {
645 recv = receive_msg(&mut lines) => match recv {
646 Ok(Some(msg)) => process_leader_message(msg, worterbuch).await?,
647 Ok(None) => break,
648 Err(e) => {
649 error!("Error receiving update from leader: {e}");
650 break;
651 }
652 },
653 recv = api_rx.recv() => match recv {
654 Some(function) => process_api_call_as_follower(worterbuch, function).await,
655 None => break,
656 },
657 _ = persistence_interval.tick() => {
658 debug!("Follower persistence interval triggered");
659 persistence::synchronous(worterbuch, &config).await?;
660 },
661 _ = subsys.on_shutdown_requested() => break,
662 }
663 }
664
665 shutdown(subsys, worterbuch, config, web_server, None, None).await
666}
667
668async fn shutdown(
669 subsys: SubsystemHandle,
670 worterbuch: &mut Worterbuch,
671 config: Config,
672 web_server: Option<ServerSubsystem>,
673 tcp_server: Option<ServerSubsystem>,
674 unix_socket: Option<ServerSubsystem>,
675) -> Result<()> {
676 info!("Shutdown sequence triggered");
677
678 subsys.request_shutdown();
679
680 shutdown_servers(web_server, tcp_server, unix_socket).await;
681
682 if config.use_persistence {
683 info!("Applying grave goods and last wills …");
684 worterbuch.apply_all_grave_goods_and_last_wills().await;
685 info!("Waiting for persistence hook to complete …");
686 persistence::synchronous(worterbuch, &config).await?;
687 info!("Shutdown persistence hook complete.");
688 }
689
690 Ok(())
691}
692
693async fn shutdown_servers(
694 web_server: Option<ServerSubsystem>,
695 tcp_server: Option<ServerSubsystem>,
696 unix_socket: Option<ServerSubsystem>,
697) {
698 if let Some(it) = web_server {
699 info!("Shutting down web server …");
700 it.initiate_shutdown();
701 if let Err(e) = it.join().await {
702 error!("Error waiting for web server to shut down: {e}");
703 }
704 }
705
706 if let Some(it) = tcp_server {
707 info!("Shutting down tcp server …");
708 it.initiate_shutdown();
709 if let Err(e) = it.join().await {
710 error!("Error waiting for tcp server to shut down: {e}");
711 }
712 }
713
714 if let Some(it) = unix_socket {
715 info!("Shutting down unix socket …");
716 it.initiate_shutdown();
717 if let Err(e) = it.join().await {
718 error!("Error waiting for unix socket to shut down: {e}");
719 }
720 }
721}
722
723async fn forward_api_call(
724 client_write_txs: &mut Vec<(usize, mpsc::Sender<ClientWriteCommand>)>,
725 dead: &mut Vec<usize>,
726 function: &WbFunction,
727 filter_sys: bool,
728) {
729 if let Some(cmd) = match function {
730 WbFunction::Get(_, _)
731 | WbFunction::CGet(_, _)
732 | WbFunction::SPubInit(_, _, _, _)
733 | WbFunction::SPub(_, _, _, _)
734 | WbFunction::Publish(_, _, _)
735 | WbFunction::Ls(_, _)
736 | WbFunction::PLs(_, _)
737 | WbFunction::PGet(_, _)
738 | WbFunction::Subscribe(_, _, _, _, _, _)
739 | WbFunction::PSubscribe(_, _, _, _, _, _)
740 | WbFunction::SubscribeLs(_, _, _, _)
741 | WbFunction::Unsubscribe(_, _, _)
742 | WbFunction::UnsubscribeLs(_, _, _)
743 | WbFunction::Connected(_, _, _)
744 | WbFunction::Disconnected(_, _)
745 | WbFunction::Config(_)
746 | WbFunction::Export(_, _)
747 | WbFunction::Import(_, _)
748 | WbFunction::Len(_)
749 | WbFunction::Lock(_, _, _)
750 | WbFunction::AcquireLock(_, _, _)
751 | WbFunction::ReleaseLock(_, _, _) => None,
752 WbFunction::Set(key, value, _, _, _) => {
753 if !filter_sys || !key.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
754 Some(ClientWriteCommand::Set(key.to_owned(), value.to_owned()))
755 } else {
756 None
757 }
758 }
759 WbFunction::CSet(key, value, version, _, _) => {
760 if !filter_sys || !key.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
761 Some(ClientWriteCommand::CSet(
762 key.to_owned(),
763 value.to_owned(),
764 version.to_owned(),
765 ))
766 } else {
767 None
768 }
769 }
770 WbFunction::Delete(key, _, _) => {
771 if !filter_sys || !key.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
772 Some(ClientWriteCommand::Delete(key.to_owned()))
773 } else {
774 None
775 }
776 }
777 WbFunction::PDelete(pattern, _, _) => {
778 if !filter_sys || !pattern.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
779 Some(ClientWriteCommand::PDelete(pattern.to_owned()))
780 } else {
781 None
782 }
783 }
784 } {
785 forward_to_followers(cmd, client_write_txs, dead).await;
786 }
787}
788
789async fn forward_to_followers(
790 cmd: ClientWriteCommand,
791 client_write_txs: &mut Vec<(usize, mpsc::Sender<ClientWriteCommand>)>,
792 dead: &mut Vec<usize>,
793) {
794 for (id, tx) in client_write_txs.iter() {
795 if tx.send(cmd.clone()).await.is_err() {
796 dead.push(*id);
797 }
798 }
799 if !dead.is_empty() {
800 client_write_txs.retain(|(i, _)| !dead.contains(i));
801 dead.clear();
802 }
803}