1mod auth;
29mod config;
30pub mod error;
31mod leader_follower;
32pub(crate) mod license;
33#[cfg(not(feature = "telemetry"))]
34pub mod logging;
35mod mem_tools;
36mod persistence;
37pub mod server;
38mod stats;
39pub(crate) mod store;
40mod subscribers;
41#[cfg(feature = "telemetry")]
42pub mod telemetry;
43mod worterbuch;
44
45pub use config::*;
46pub use worterbuch_common as common;
47
48use crate::{
49 error::{WorterbuchAppError, WorterbuchAppResult},
50 persistence::unlock_persistence,
51 server::{CloneableWbApi, common::SUPPORTED_PROTOCOL_VERSIONS},
52 stats::track_stats,
53 worterbuch::Worterbuch,
54};
55use common::{
56 KeySegment, PStateEvent, SYSTEM_TOPIC_CLIENTS, SYSTEM_TOPIC_GRAVE_GOODS,
57 SYSTEM_TOPIC_LAST_WILL, SYSTEM_TOPIC_MODE, SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_ROOT_PREFIX,
58 SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSION, Value, error::WorterbuchError, receive_msg, topic,
59};
60use leader_follower::{
61 ClientWriteCommand, LeaderSyncMessage, Mode, StateSync, initial_sync, process_leader_message,
62 run_cluster_sync_port, shutdown_on_stdin_close,
63};
64use serde_json::json;
65use server::common::WbFunction;
66use std::error::Error;
67use tokio::{
68 io::{AsyncBufReadExt, BufReader},
69 net::TcpStream,
70 select,
71 sync::{mpsc, oneshot},
72};
73use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
74use tracing::{Instrument, Level, debug, error, info, span};
75use worterbuch_common::{INTERNAL_CLIENT_ID, ValueEntry};
76
77type ServerSubsystem = tokio_graceful_shutdown::NestedSubsystem<Box<dyn Error + Send + Sync>>;
78
79pub async fn spawn_worterbuch(
80 subsys: &SubsystemHandle,
81 config: Config,
82) -> WorterbuchAppResult<CloneableWbApi> {
83 let (api_tx, api_rx) = oneshot::channel();
84 subsys.start(SubsystemBuilder::new(
85 "worterbuch",
86 async |s: &mut SubsystemHandle| do_run_worterbuch(s, config, Some(api_tx)).await,
87 ));
88 Ok(api_rx.await?)
89}
90
91pub async fn run_worterbuch(
92 subsys: &mut SubsystemHandle,
93 config: Config,
94) -> WorterbuchAppResult<()> {
95 do_run_worterbuch(subsys, config, None).await?;
96 Ok(())
97}
98
99async fn do_run_worterbuch(
100 subsys: &mut SubsystemHandle,
101 config: Config,
102 tx: Option<oneshot::Sender<CloneableWbApi>>,
103) -> WorterbuchAppResult<()> {
104 let channel_buffer_size = config.channel_buffer_size;
105 let (api_tx, api_rx) = mpsc::channel(channel_buffer_size);
106 let api = CloneableWbApi::new(api_tx, config.clone());
107
108 if let Some(tx) = tx {
109 tx.send(api.clone()).ok();
110 }
111
112 let mut worterbuch = persistence::restore(subsys, &config, &api).await?;
113
114 let web_server = if let Some(WsEndpoint {
115 endpoint: Endpoint {
116 tls,
117 bind_addr,
118 port,
119 },
120 public_addr,
121 }) = &config.ws_endpoint
122 {
123 let sapi = api.clone();
124 let tls = tls.to_owned();
125 let bind_addr = bind_addr.to_owned();
126 let port = port.to_owned();
127 let public_addr = public_addr.to_owned();
128 let ws_enabled = !config.follower;
129 Some(subsys.start(SubsystemBuilder::new(
130 "webserver",
131 async move |subsys: &mut SubsystemHandle| {
132 server::axum::start(sapi, tls, bind_addr, port, public_addr, subsys, ws_enabled)
133 .await
134 },
135 )))
136 } else {
137 None
138 };
139
140 if config.follower {
141 run_in_follower_mode(subsys, worterbuch, api_rx, config, web_server).await?;
142 } else {
143 worterbuch
144 .set(
145 topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSION),
146 serde_json::to_value(SUPPORTED_PROTOCOL_VERSIONS)
147 .unwrap_or_else(|e| Value::String(format!("Error serializing version: {e}"))),
148 INTERNAL_CLIENT_ID,
149 )
150 .await?;
151
152 let worterbuch_uptime = api.clone();
153 subsys.start(SubsystemBuilder::new(
154 "stats",
155 async |subsys: &mut SubsystemHandle| track_stats(worterbuch_uptime, subsys).await,
156 ));
157
158 let cfg = config.clone();
159 let tcp_server = if let Some(Endpoint {
160 tls: _,
161 bind_addr,
162 port,
163 }) = &config.tcp_endpoint
164 {
165 let sapi = api.clone();
166 let bind_addr = bind_addr.to_owned();
167 let port = port.to_owned();
168 Some(subsys.start(SubsystemBuilder::new(
169 "tcpserver",
170 async move |subsys: &mut SubsystemHandle| {
171 server::tcp::start(sapi, cfg, bind_addr, port, subsys).await
172 },
173 )))
174 } else {
175 None
176 };
177
178 #[cfg(target_family = "unix")]
179 let unix_socket = if let Some(UnixEndpoint { path }) = &config.unix_endpoint {
180 let sapi = api.clone();
181 let path = path.clone();
182 Some(subsys.start(SubsystemBuilder::new(
183 "unixsocket",
184 async move |subsys: &mut SubsystemHandle| {
185 server::unix::start(sapi, path, subsys).await
186 },
187 )))
188 } else {
189 None
190 };
191
192 #[cfg(not(target_family = "unix"))]
193 let unix_socket = None;
194
195 if config.leader {
196 run_in_leader_mode(
197 subsys,
198 worterbuch,
199 api_rx,
200 config,
201 web_server,
202 tcp_server,
203 unix_socket,
204 )
205 .await?;
206 } else {
207 run_in_regular_mode(
208 subsys,
209 worterbuch,
210 api_rx,
211 config,
212 web_server,
213 tcp_server,
214 unix_socket,
215 )
216 .await?;
217 }
218 }
219
220 debug!("worterbuch subsystem completed.");
221
222 Ok(())
223}
224
225async fn process_api_call(worterbuch: &mut Worterbuch, function: WbFunction) {
226 match function {
227 WbFunction::Get(key, tx) => {
228 tx.send(worterbuch.get(&key)).ok();
229 }
230 WbFunction::CGet(key, tx) => {
231 tx.send(worterbuch.cget(&key)).ok();
232 }
233 WbFunction::Set(key, value, client_id, tx, span) => {
234 tx.send(worterbuch.set(key, value, client_id).instrument(span).await)
235 .ok();
236 }
237 WbFunction::CSet(key, value, version, client_id, tx) => {
238 tx.send(worterbuch.cset(key, value, version, client_id).await)
239 .ok();
240 }
241 WbFunction::SPubInit(transaction_id, key, client_id, tx) => {
242 tx.send(worterbuch.spub_init(transaction_id, key, client_id).await)
243 .ok();
244 }
245 WbFunction::SPub(transaction_id, value, client_id, tx) => {
246 tx.send(worterbuch.spub(transaction_id, value, client_id).await)
247 .ok();
248 }
249 WbFunction::Publish(key, value, tx) => {
250 tx.send(worterbuch.publish(key, value).await).ok();
251 }
252 WbFunction::Ls(parent, tx) => {
253 tx.send(worterbuch.ls(&parent)).ok();
254 }
255 WbFunction::PLs(parent, tx) => {
256 tx.send(worterbuch.pls(&parent)).ok();
257 }
258 WbFunction::PGet(pattern, tx) => {
259 tx.send(worterbuch.pget(&pattern)).ok();
260 }
261 WbFunction::Subscribe(client_id, transaction_id, key, unique, live_only, tx) => {
262 tx.send(
263 worterbuch
264 .subscribe(client_id, transaction_id, key, unique, live_only)
265 .await,
266 )
267 .ok();
268 }
269 WbFunction::PSubscribe(client_id, transaction_id, pattern, unique, live_only, tx) => {
270 tx.send(
271 worterbuch
272 .psubscribe(client_id, transaction_id, pattern, unique, live_only)
273 .await,
274 )
275 .ok();
276 }
277 WbFunction::SubscribeLs(client_id, transaction_id, parent, tx) => {
278 tx.send(
279 worterbuch
280 .subscribe_ls(client_id, transaction_id, parent)
281 .await,
282 )
283 .ok();
284 }
285 WbFunction::Unsubscribe(client_id, transaction_id, tx) => {
286 tx.send(worterbuch.unsubscribe(client_id, transaction_id).await)
287 .ok();
288 }
289 WbFunction::UnsubscribeLs(client_id, transaction_id, tx) => {
290 tx.send(worterbuch.unsubscribe_ls(client_id, transaction_id))
291 .ok();
292 }
293 WbFunction::Delete(key, client_id, tx) => {
294 tx.send(worterbuch.delete(key, client_id).await).ok();
295 }
296 WbFunction::PDelete(pattern, client_id, tx) => {
297 tx.send(worterbuch.pdelete(pattern, client_id).await).ok();
298 }
299 WbFunction::Lock(key, client_id, tx) => {
300 tx.send(worterbuch.lock(key, client_id).await).ok();
301 }
302 WbFunction::AcquireLock(key, client_id, tx) => {
303 tx.send(worterbuch.acquire_lock(key, client_id).await).ok();
304 }
305 WbFunction::ReleaseLock(key, client_id, tx) => {
306 tx.send(worterbuch.release_lock(key, client_id).await).ok();
307 }
308 WbFunction::Connected(client_id, remote_addr, protocol, tx) => {
309 let res = worterbuch
310 .connected(client_id, remote_addr, &protocol)
311 .await;
312 tx.send(res).ok();
313 }
314 WbFunction::ProtocolSwitched(client_id, protocol) => {
315 worterbuch.protocol_switched(client_id, protocol).await;
316 }
317 WbFunction::Disconnected(client_id, remote_addr) => {
318 worterbuch.disconnected(client_id, remote_addr).await.ok();
319 }
320 WbFunction::Config(tx) => {
321 tx.send(worterbuch.config().clone()).ok();
322 }
323 WbFunction::Export(tx, span) => {
324 let g = span.enter();
325 worterbuch.export_for_persistence(tx);
326 drop(g);
327 drop(span);
328 }
329 WbFunction::Import(json, tx) => {
330 tx.send(worterbuch.import(&json).await).ok();
331 }
332 WbFunction::Len(tx) => {
333 tx.send(worterbuch.len()).ok();
334 }
335 }
336}
337
338async fn process_api_call_as_follower(worterbuch: &mut Worterbuch, function: WbFunction) {
339 match function {
340 WbFunction::Get(key, tx) => {
341 tx.send(worterbuch.get(&key)).ok();
342 }
343 WbFunction::CGet(key, tx) => {
344 tx.send(worterbuch.cget(&key)).ok();
345 }
346 WbFunction::Set(_, _, _, tx, _) => {
347 tx.send(Err(WorterbuchError::NotLeader)).ok();
348 }
349 WbFunction::CSet(_, _, _, _, tx) => {
350 tx.send(Err(WorterbuchError::NotLeader)).ok();
351 }
352 WbFunction::SPubInit(_, _, _, tx) => {
353 tx.send(Err(WorterbuchError::NotLeader)).ok();
354 }
355 WbFunction::SPub(_, _, _, tx) => {
356 tx.send(Err(WorterbuchError::NotLeader)).ok();
357 }
358 WbFunction::Publish(_, _, tx) => {
359 tx.send(Err(WorterbuchError::NotLeader)).ok();
360 }
361 WbFunction::Ls(parent, tx) => {
362 tx.send(worterbuch.ls(&parent)).ok();
363 }
364 WbFunction::PLs(parent, tx) => {
365 tx.send(worterbuch.pls(&parent)).ok();
366 }
367 WbFunction::PGet(pattern, tx) => {
368 tx.send(worterbuch.pget(&pattern)).ok();
369 }
370 WbFunction::Subscribe(client_id, transaction_id, key, unique, live_only, tx) => {
371 tx.send(
372 worterbuch
373 .subscribe(client_id, transaction_id, key, unique, live_only)
374 .await,
375 )
376 .ok();
377 }
378 WbFunction::PSubscribe(client_id, transaction_id, pattern, unique, live_only, tx) => {
379 tx.send(
380 worterbuch
381 .psubscribe(client_id, transaction_id, pattern, unique, live_only)
382 .await,
383 )
384 .ok();
385 }
386 WbFunction::SubscribeLs(client_id, transaction_id, parent, tx) => {
387 tx.send(
388 worterbuch
389 .subscribe_ls(client_id, transaction_id, parent)
390 .await,
391 )
392 .ok();
393 }
394 WbFunction::Unsubscribe(client_id, transaction_id, tx) => {
395 tx.send(worterbuch.unsubscribe(client_id, transaction_id).await)
396 .ok();
397 }
398 WbFunction::UnsubscribeLs(client_id, transaction_id, tx) => {
399 tx.send(worterbuch.unsubscribe_ls(client_id, transaction_id))
400 .ok();
401 }
402 WbFunction::Lock(_, _, tx) => {
403 tx.send(Err(WorterbuchError::NotLeader)).ok();
404 }
405 WbFunction::AcquireLock(_, _, tx) => {
406 tx.send(Err(WorterbuchError::NotLeader)).ok();
407 }
408 WbFunction::ReleaseLock(_, _, tx) => {
409 tx.send(Err(WorterbuchError::NotLeader)).ok();
410 }
411 WbFunction::Delete(_, _, tx) => {
412 tx.send(Err(WorterbuchError::NotLeader)).ok();
413 }
414 WbFunction::PDelete(_, _, tx) => {
415 tx.send(Err(WorterbuchError::NotLeader)).ok();
416 }
417 WbFunction::Connected(client_id, remote_addr, protocol, tx) => {
418 let res = worterbuch
419 .connected(client_id, remote_addr, &protocol)
420 .await;
421 tx.send(res).ok();
422 }
423 WbFunction::ProtocolSwitched(client_id, protocol) => {
424 worterbuch.protocol_switched(client_id, protocol).await;
425 }
426 WbFunction::Disconnected(client_id, remote_addr) => {
427 worterbuch.disconnected(client_id, remote_addr).await.ok();
428 }
429 WbFunction::Config(tx) => {
430 tx.send(worterbuch.config().clone()).ok();
431 }
432 WbFunction::Export(tx, span) => {
433 _ = span.enter();
434 worterbuch.export_for_persistence(tx);
435 }
436 WbFunction::Import(_, tx) => {
437 tx.send(Err(WorterbuchError::NotLeader)).ok();
438 }
439 WbFunction::Len(tx) => {
440 tx.send(worterbuch.len()).ok();
441 }
442 }
443}
444
445async fn run_in_regular_mode(
446 subsys: &SubsystemHandle,
447 mut worterbuch: Worterbuch,
448 mut api_rx: mpsc::Receiver<WbFunction>,
449 config: Config,
450 web_server: Option<ServerSubsystem>,
451 tcp_server: Option<ServerSubsystem>,
452 unix_socket: Option<ServerSubsystem>,
453) -> WorterbuchAppResult<()> {
454 loop {
455 select! {
456 recv = api_rx.recv() => match recv {
457 Some(function) => process_api_call(&mut worterbuch, function).await,
458 None => break,
459 },
460 _ = subsys.on_shutdown_requested() => break,
461 }
462 }
463
464 shutdown(
465 subsys,
466 worterbuch,
467 config,
468 web_server,
469 tcp_server,
470 unix_socket,
471 )
472 .await
473}
474
475async fn run_in_leader_mode(
476 subsys: &SubsystemHandle,
477 mut worterbuch: Worterbuch,
478 mut api_rx: mpsc::Receiver<WbFunction>,
479 config: Config,
480 web_server: Option<ServerSubsystem>,
481 tcp_server: Option<ServerSubsystem>,
482 unix_socket: Option<ServerSubsystem>,
483) -> WorterbuchAppResult<()> {
484 info!("Running in LEADER mode.");
485
486 shutdown_on_stdin_close(subsys);
487
488 worterbuch
489 .set(
490 topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_MODE),
491 json!(Mode::Leader),
492 INTERNAL_CLIENT_ID,
493 )
494 .await?;
495
496 let mut client_write_txs: Vec<(usize, mpsc::Sender<ClientWriteCommand>)> = vec![];
497 let (follower_connected_tx, mut follower_connected_rx) = mpsc::channel::<
498 oneshot::Sender<(StateSync, mpsc::Receiver<ClientWriteCommand>)>,
499 >(config.channel_buffer_size);
500
501 let mut tx_id = 0;
502 let mut dead = vec![];
503
504 let cfg = config.clone();
505 subsys.start(SubsystemBuilder::new(
506 "cluster_sync_port",
507 async move |s: &mut SubsystemHandle| {
508 run_cluster_sync_port(s, cfg, follower_connected_tx).await
509 },
510 ));
511
512 let (mut grave_goods_rx, _) = worterbuch
513 .psubscribe(
514 INTERNAL_CLIENT_ID,
515 0,
516 topic!(
517 SYSTEM_TOPIC_ROOT,
518 SYSTEM_TOPIC_CLIENTS,
519 KeySegment::Wildcard,
520 SYSTEM_TOPIC_GRAVE_GOODS
521 ),
522 true,
523 false,
524 )
525 .await?;
526 let (mut last_will_rx, _) = worterbuch
527 .psubscribe(
528 INTERNAL_CLIENT_ID,
529 0,
530 topic!(
531 SYSTEM_TOPIC_ROOT,
532 SYSTEM_TOPIC_CLIENTS,
533 KeySegment::Wildcard,
534 SYSTEM_TOPIC_LAST_WILL
535 ),
536 true,
537 false,
538 )
539 .await?;
540
541 loop {
542 select! {
543 recv = grave_goods_rx.recv() => if let Some(e) = recv {
544 debug!("Forwarding grave goods change: {e:?}");
545 match e {
546 PStateEvent::KeyValuePairs(kvps) => {
547 for kvp in kvps {
548 let span = span!(Level::DEBUG, "forward_grave_goods");
549 forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Set(kvp.key, kvp.value, INTERNAL_CLIENT_ID, oneshot::channel().0, span), false).await;
550 }
551 },
552 PStateEvent::Deleted(kvps) => {
553 for kvp in kvps {
554 forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Delete(kvp.key, INTERNAL_CLIENT_ID, oneshot::channel().0), false).await;
555 }
556 },
557 }
558 },
559 recv = last_will_rx.recv() => if let Some(e) = recv {
560 debug!("Forwarding last will change: {e:?}");
561 match e {
562 PStateEvent::KeyValuePairs(kvps) => {
563 for kvp in kvps {
564 let span = span!(Level::DEBUG, "forward_last_will");
565 forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Set(kvp.key, kvp.value, INTERNAL_CLIENT_ID, oneshot::channel().0, span), false).await;
566 }
567 },
568 PStateEvent::Deleted(kvps) => {
569 for kvp in kvps {
570 forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Delete(kvp.key, INTERNAL_CLIENT_ID, oneshot::channel().0), false).await;
571 }
572 },
573 }
574 },
575 recv = api_rx.recv() => match recv {
576 Some(WbFunction::Import(json, tx)) => {
577 let (tx_int, rx_int) = oneshot::channel();
578 process_api_call(&mut worterbuch, WbFunction::Import(json, tx_int)).await;
579 let imported_values = rx_int.await??;
580
581 for (key, (value, changed)) in &imported_values {
582 if *changed {
583 let cmd = match value.to_owned() {
584 ValueEntry::Cas(value, version) => ClientWriteCommand::CSet(key.to_owned(), value, version),
585 ValueEntry::Plain(value) => ClientWriteCommand::Set(key.to_owned(), value),
586 };
587 forward_to_followers(cmd, &mut client_write_txs, &mut dead).await;
588 }
589 }
590 tx.send(Ok(imported_values)).ok();
591 },
592 Some(function) => {
593 forward_api_call(&mut client_write_txs, &mut dead, &function, true).await;
594 process_api_call(&mut worterbuch, function).await;
595 },
596 None => break,
597 },
598 recv = follower_connected_rx.recv() => match recv {
599 Some(state_tx) => {
600 let (client_write_tx, client_write_rx) = mpsc::channel(config.channel_buffer_size);
601 let (current_state, grave_goods, last_will) = worterbuch.export();
602 if state_tx.send((StateSync(current_state, grave_goods, last_will), client_write_rx)).is_ok() {
603 client_write_txs.push((tx_id, client_write_tx));
604 tx_id += 1;
605 }
606 },
607 None => break,
608 },
609 _ = subsys.on_shutdown_requested() => break,
610 }
611 }
612
613 shutdown(
614 subsys,
615 worterbuch,
616 config,
617 web_server,
618 tcp_server,
619 unix_socket,
620 )
621 .await
622}
623
624async fn run_in_follower_mode(
625 subsys: &SubsystemHandle,
626 mut worterbuch: Worterbuch,
627 mut api_rx: mpsc::Receiver<WbFunction>,
628 config: Config,
629 web_server: Option<ServerSubsystem>,
630) -> WorterbuchAppResult<()> {
631 let leader_addr = if let Some(it) = &config.leader_address {
632 it
633 } else {
634 return Err(WorterbuchAppError::ConfigError(
635 "No valid leader address configured.".to_owned(),
636 ));
637 };
638 info!("Running in FOLLOWER mode. Leader: {}", leader_addr,);
639
640 shutdown_on_stdin_close(subsys);
641
642 worterbuch
643 .set(
644 topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_MODE),
645 json!(Mode::Follower),
646 INTERNAL_CLIENT_ID,
647 )
648 .await?;
649
650 let mut persistence_interval = config.persistence_interval();
651
652 let stream = TcpStream::connect(leader_addr).await?;
653
654 let mut lines = BufReader::new(stream).lines();
655
656 info!("Waiting for initial sync message from leader …");
657 select! {
658 recv = receive_msg(&mut lines) => match recv {
659 Ok(Some(msg)) => {
660 if let LeaderSyncMessage::Init(state) = msg {
661 initial_sync(state, &mut worterbuch).await?;
662 unlock_persistence();
663 persistence_interval.reset();
664 worterbuch.flush().await?;
665 } else {
666 return Err(WorterbuchAppError::ClusterError("first message from leader is supposed to be the initial sync, but it wasn't".to_owned()));
667 }
668 },
669 Ok(None) => return Err(WorterbuchAppError::ClusterError("connection to leader closed before initial sync".to_owned())),
670 Err(e) => {
671 return Err(WorterbuchAppError::ClusterError(format!("error receiving update from leader: {e}")));
672 }
673 },
674 _ = subsys.on_shutdown_requested() => return Err(WorterbuchAppError::ClusterError("shut down before initial sync".to_owned())),
675 }
676 info!("Successfully synced with leader.");
677
678 loop {
679 select! {
680 recv = receive_msg(&mut lines) => match recv {
681 Ok(Some(msg)) => process_leader_message(msg, &mut worterbuch).await?,
682 Ok(None) => break,
683 Err(e) => {
684 error!("Error receiving update from leader: {e}");
685 break;
686 }
687 },
688 recv = api_rx.recv() => match recv {
689 Some(function) => process_api_call_as_follower(&mut worterbuch, function).await,
690 None => break,
691 },
692 _ = persistence_interval.tick() => {
693 debug!("Follower persistence interval triggered");
694 worterbuch.flush().await?;
695 },
696 _ = subsys.on_shutdown_requested() => break,
697 }
698 }
699
700 shutdown(subsys, worterbuch, config, web_server, None, None).await
701}
702
703async fn shutdown(
704 subsys: &SubsystemHandle,
705 mut worterbuch: Worterbuch,
706 config: Config,
707 web_server: Option<ServerSubsystem>,
708 tcp_server: Option<ServerSubsystem>,
709 unix_socket: Option<ServerSubsystem>,
710) -> WorterbuchAppResult<()> {
711 info!("Shutdown sequence triggered");
712
713 subsys.request_shutdown();
714
715 shutdown_servers(web_server, tcp_server, unix_socket).await;
716
717 if config.use_persistence {
718 info!("Applying grave goods and last wills …");
719 worterbuch.apply_all_grave_goods_and_last_wills().await;
720 info!("Waiting for persistence hook to complete …");
721 worterbuch.flush().await?;
722 info!("Shutdown persistence hook complete.");
723 }
724
725 Ok(())
726}
727
728async fn shutdown_servers(
729 web_server: Option<ServerSubsystem>,
730 tcp_server: Option<ServerSubsystem>,
731 unix_socket: Option<ServerSubsystem>,
732) {
733 if let Some(it) = web_server {
734 info!("Shutting down web server …");
735 it.initiate_shutdown();
736 if let Err(e) = it.join().await {
737 error!("Error waiting for web server to shut down: {e}");
738 }
739 }
740
741 if let Some(it) = tcp_server {
742 info!("Shutting down tcp server …");
743 it.initiate_shutdown();
744 if let Err(e) = it.join().await {
745 error!("Error waiting for tcp server to shut down: {e}");
746 }
747 }
748
749 if let Some(it) = unix_socket {
750 info!("Shutting down unix socket …");
751 it.initiate_shutdown();
752 if let Err(e) = it.join().await {
753 error!("Error waiting for unix socket to shut down: {e}");
754 }
755 }
756}
757
758async fn forward_api_call(
759 client_write_txs: &mut Vec<(usize, mpsc::Sender<ClientWriteCommand>)>,
760 dead: &mut Vec<usize>,
761 function: &WbFunction,
762 filter_sys: bool,
763) {
764 if let Some(cmd) = match function {
765 WbFunction::Get(_, _)
766 | WbFunction::CGet(_, _)
767 | WbFunction::SPubInit(_, _, _, _)
768 | WbFunction::SPub(_, _, _, _)
769 | WbFunction::Publish(_, _, _)
770 | WbFunction::Ls(_, _)
771 | WbFunction::PLs(_, _)
772 | WbFunction::PGet(_, _)
773 | WbFunction::Subscribe(_, _, _, _, _, _)
774 | WbFunction::PSubscribe(_, _, _, _, _, _)
775 | WbFunction::SubscribeLs(_, _, _, _)
776 | WbFunction::Unsubscribe(_, _, _)
777 | WbFunction::UnsubscribeLs(_, _, _)
778 | WbFunction::Connected(_, _, _, _)
779 | WbFunction::ProtocolSwitched(_, _)
780 | WbFunction::Disconnected(_, _)
781 | WbFunction::Config(_)
782 | WbFunction::Export(_, _)
783 | WbFunction::Import(_, _)
784 | WbFunction::Len(_)
785 | WbFunction::Lock(_, _, _)
786 | WbFunction::AcquireLock(_, _, _)
787 | WbFunction::ReleaseLock(_, _, _) => None,
788 WbFunction::Set(key, value, _, _, _) => {
789 if !filter_sys || !key.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
790 Some(ClientWriteCommand::Set(key.to_owned(), value.to_owned()))
791 } else {
792 None
793 }
794 }
795 WbFunction::CSet(key, value, version, _, _) => {
796 if !filter_sys || !key.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
797 Some(ClientWriteCommand::CSet(
798 key.to_owned(),
799 value.to_owned(),
800 version.to_owned(),
801 ))
802 } else {
803 None
804 }
805 }
806 WbFunction::Delete(key, _, _) => {
807 if !filter_sys || !key.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
808 Some(ClientWriteCommand::Delete(key.to_owned()))
809 } else {
810 None
811 }
812 }
813 WbFunction::PDelete(pattern, _, _) => {
814 if !filter_sys || !pattern.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
815 Some(ClientWriteCommand::PDelete(pattern.to_owned()))
816 } else {
817 None
818 }
819 }
820 } {
821 forward_to_followers(cmd, client_write_txs, dead).await;
822 }
823}
824
825async fn forward_to_followers(
826 cmd: ClientWriteCommand,
827 client_write_txs: &mut Vec<(usize, mpsc::Sender<ClientWriteCommand>)>,
828 dead: &mut Vec<usize>,
829) {
830 for (id, tx) in client_write_txs.iter() {
831 if tx.send(cmd.clone()).await.is_err() {
832 dead.push(*id);
833 }
834 }
835 if !dead.is_empty() {
836 client_write_txs.retain(|(i, _)| !dead.contains(i));
837 dead.clear();
838 }
839}