worterbuch/
lib.rs

1/*
2 *  The worterbuch application library
3 *
4 *  Copyright (C) 2024 Michael Bachmann
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU Affero General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU Affero General Public License for more details.
15 *
16 *  You should have received a copy of the GNU Affero General Public License
17 *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20//! This library allows you to embed worterbuch into your application.
21//!
22//! Note that while it makes embedding very easy, it does leak several
23//! dependencies into your application that a proper library normally
24//! shouldn't. Worterbuch takes this liberty because it is essentailly
25//! still an application. Just one that you can start from within your
26//! own application.
27
28mod auth;
29mod config;
30mod leader_follower;
31pub mod license;
32mod persistence;
33mod server;
34mod stats;
35pub mod store;
36mod subscribers;
37mod telemetry;
38mod worterbuch;
39
40use crate::stats::track_stats;
41pub use crate::worterbuch::*;
42pub use config::*;
43use leader_follower::{
44    ClientWriteCommand, LeaderSyncMessage, Mode, StateSync, initial_sync, process_leader_message,
45    run_cluster_sync_port, shutdown_on_stdin_close,
46};
47use miette::{IntoDiagnostic, Result, miette};
48use persistence::PERSISTENCE_LOCKED;
49use serde_json::{Value, json};
50use server::common::{CloneableWbApi, WbFunction};
51use std::{error::Error, sync::atomic::Ordering};
52use store::ValueEntry;
53use tokio::{
54    io::{AsyncBufReadExt, BufReader},
55    net::TcpStream,
56    select,
57    sync::{mpsc, oneshot},
58};
59use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
60use tracing::{Instrument, Level, debug, error, info, span};
61use uuid::Uuid;
62use worterbuch_common::{
63    KeySegment, PStateEvent, ProtocolVersion, SYSTEM_TOPIC_CLIENTS, SYSTEM_TOPIC_GRAVE_GOODS,
64    SYSTEM_TOPIC_LAST_WILL, SYSTEM_TOPIC_MODE, SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_ROOT_PREFIX,
65    SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSION, error::WorterbuchError, receive_msg, topic,
66};
67
68pub const SUPPORTED_PROTOCOL_VERSIONS: [ProtocolVersion; 2] =
69    [ProtocolVersion::new(0, 11), ProtocolVersion::new(1, 1)];
70
71pub const INTERNAL_CLIENT_ID: Uuid = Uuid::nil();
72
73type ServerSubsystem = tokio_graceful_shutdown::NestedSubsystem<Box<dyn Error + Send + Sync>>;
74
75pub async fn run_worterbuch(subsys: SubsystemHandle, config: Config) -> Result<()> {
76    let config_pers = config.clone();
77
78    let channel_buffer_size = config.channel_buffer_size;
79
80    let use_persistence = config.use_persistence;
81
82    let mut worterbuch = if use_persistence && !config.follower {
83        persistence::load(config.clone()).await?
84    } else {
85        Worterbuch::with_config(config.clone())
86    };
87
88    let (api_tx, api_rx) = mpsc::channel(channel_buffer_size);
89    let api = CloneableWbApi::new(api_tx);
90
91    let web_server = if let Some(WsEndpoint {
92        endpoint: Endpoint {
93            tls,
94            bind_addr,
95            port,
96        },
97        public_addr,
98    }) = &config.ws_endpoint
99    {
100        let sapi = api.clone();
101        let tls = tls.to_owned();
102        let bind_addr = bind_addr.to_owned();
103        let port = port.to_owned();
104        let public_addr = public_addr.to_owned();
105        let ws_enabled = !config.follower;
106        Some(
107            subsys.start(SubsystemBuilder::new("webserver", move |subsys| {
108                server::poem::start(sapi, tls, bind_addr, port, public_addr, subsys, ws_enabled)
109            })),
110        )
111    } else {
112        None
113    };
114
115    if config.follower {
116        run_in_follower_mode(subsys, &mut worterbuch, api_rx, config, web_server).await?;
117    } else {
118        worterbuch
119            .set(
120                topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSION),
121                serde_json::to_value(SUPPORTED_PROTOCOL_VERSIONS)
122                    .unwrap_or_else(|e| Value::String(format!("Error serializing version: {e}"))),
123                INTERNAL_CLIENT_ID,
124            )
125            .await?;
126
127        if use_persistence {
128            let worterbuch_pers = api.clone();
129            subsys.start(SubsystemBuilder::new("persistence", |subsys| {
130                persistence::periodic(worterbuch_pers, config_pers, subsys)
131            }));
132        }
133
134        let worterbuch_uptime = api.clone();
135        subsys.start(SubsystemBuilder::new("stats", |subsys| {
136            track_stats(worterbuch_uptime, subsys)
137        }));
138
139        let tcp_server = if let Some(Endpoint {
140            tls: _,
141            bind_addr,
142            port,
143        }) = &config.tcp_endpoint
144        {
145            let sapi = api.clone();
146            let bind_addr = bind_addr.to_owned();
147            let port = port.to_owned();
148            Some(
149                subsys.start(SubsystemBuilder::new("tcpserver", move |subsys| {
150                    server::tcp::start(sapi, bind_addr, port, subsys)
151                })),
152            )
153        } else {
154            None
155        };
156
157        #[cfg(target_family = "unix")]
158        let unix_socket = if let Some(UnixEndpoint { path }) = &config.unix_endpoint {
159            let sapi = api.clone();
160            let path = path.clone();
161            Some(
162                subsys.start(SubsystemBuilder::new("unixsocket", move |subsys| {
163                    server::unix::start(sapi, path, subsys)
164                })),
165            )
166        } else {
167            None
168        };
169
170        #[cfg(not(target_family = "unix"))]
171        let unix_socket = None;
172
173        if config.leader {
174            run_in_leader_mode(
175                subsys,
176                worterbuch,
177                api_rx,
178                config,
179                web_server,
180                tcp_server,
181                unix_socket,
182            )
183            .await?;
184        } else {
185            run_in_regular_mode(
186                subsys,
187                worterbuch,
188                api_rx,
189                config,
190                web_server,
191                tcp_server,
192                unix_socket,
193            )
194            .await?;
195        }
196    }
197
198    debug!("worterbuch subsystem completed.");
199
200    Ok(())
201}
202
203async fn process_api_call(worterbuch: &mut Worterbuch, function: WbFunction) {
204    match function {
205        WbFunction::Get(key, tx) => {
206            tx.send(worterbuch.get(&key)).ok();
207        }
208        WbFunction::CGet(key, tx) => {
209            tx.send(worterbuch.cget(&key)).ok();
210        }
211        WbFunction::Set(key, value, client_id, tx, span) => {
212            tx.send(worterbuch.set(key, value, client_id).instrument(span).await)
213                .ok();
214        }
215        WbFunction::CSet(key, value, version, client_id, tx) => {
216            tx.send(worterbuch.cset(key, value, version, client_id).await)
217                .ok();
218        }
219        WbFunction::SPubInit(transaction_id, key, client_id, tx) => {
220            tx.send(worterbuch.spub_init(transaction_id, key, client_id).await)
221                .ok();
222        }
223        WbFunction::SPub(transaction_id, value, client_id, tx) => {
224            tx.send(worterbuch.spub(transaction_id, value, client_id).await)
225                .ok();
226        }
227        WbFunction::Publish(key, value, tx) => {
228            tx.send(worterbuch.publish(key, value).await).ok();
229        }
230        WbFunction::Ls(parent, tx) => {
231            tx.send(worterbuch.ls(&parent)).ok();
232        }
233        WbFunction::PLs(parent, tx) => {
234            tx.send(worterbuch.pls(&parent)).ok();
235        }
236        WbFunction::PGet(pattern, tx) => {
237            tx.send(worterbuch.pget(&pattern)).ok();
238        }
239        WbFunction::Subscribe(client_id, transaction_id, key, unique, live_only, tx) => {
240            tx.send(
241                worterbuch
242                    .subscribe(client_id, transaction_id, key, unique, live_only)
243                    .await,
244            )
245            .ok();
246        }
247        WbFunction::PSubscribe(client_id, transaction_id, pattern, unique, live_only, tx) => {
248            tx.send(
249                worterbuch
250                    .psubscribe(client_id, transaction_id, pattern, unique, live_only)
251                    .await,
252            )
253            .ok();
254        }
255        WbFunction::SubscribeLs(client_id, transaction_id, parent, tx) => {
256            tx.send(
257                worterbuch
258                    .subscribe_ls(client_id, transaction_id, parent)
259                    .await,
260            )
261            .ok();
262        }
263        WbFunction::Unsubscribe(client_id, transaction_id, tx) => {
264            tx.send(worterbuch.unsubscribe(client_id, transaction_id).await)
265                .ok();
266        }
267        WbFunction::UnsubscribeLs(client_id, transaction_id, tx) => {
268            tx.send(worterbuch.unsubscribe_ls(client_id, transaction_id))
269                .ok();
270        }
271        WbFunction::Delete(key, client_id, tx) => {
272            tx.send(worterbuch.delete(key, client_id).await).ok();
273        }
274        WbFunction::PDelete(pattern, client_id, tx) => {
275            tx.send(worterbuch.pdelete(pattern, client_id).await).ok();
276        }
277        WbFunction::Lock(key, client_id, tx) => {
278            tx.send(worterbuch.lock(key, client_id)).ok();
279        }
280        WbFunction::AcquireLock(key, client_id, tx) => {
281            tx.send(worterbuch.acquire_lock(key, client_id).await).ok();
282        }
283        WbFunction::ReleaseLock(key, client_id, tx) => {
284            tx.send(worterbuch.release_lock(key, client_id).await).ok();
285        }
286        WbFunction::Connected(client_id, remote_addr, protocol) => {
287            worterbuch
288                .connected(client_id, remote_addr, &protocol)
289                .await;
290        }
291        WbFunction::Disconnected(client_id, remote_addr) => {
292            worterbuch.disconnected(client_id, remote_addr).await.ok();
293        }
294        WbFunction::Config(tx) => {
295            tx.send(worterbuch.config().clone()).ok();
296        }
297        WbFunction::Export(tx, span) => {
298            let g = span.enter();
299            worterbuch.export_for_persistence(tx);
300            drop(g);
301            drop(span);
302        }
303        WbFunction::Import(json, tx) => {
304            tx.send(worterbuch.import(&json).await).ok();
305        }
306        WbFunction::Len(tx) => {
307            tx.send(worterbuch.len()).ok();
308        }
309    }
310}
311
312async fn process_api_call_as_follower(worterbuch: &mut Worterbuch, function: WbFunction) {
313    match function {
314        WbFunction::Get(key, tx) => {
315            tx.send(worterbuch.get(&key)).ok();
316        }
317        WbFunction::CGet(key, tx) => {
318            tx.send(worterbuch.cget(&key)).ok();
319        }
320        WbFunction::Set(_, _, _, tx, _) => {
321            tx.send(Err(WorterbuchError::NotLeader)).ok();
322        }
323        WbFunction::CSet(_, _, _, _, tx) => {
324            tx.send(Err(WorterbuchError::NotLeader)).ok();
325        }
326        WbFunction::SPubInit(_, _, _, tx) => {
327            tx.send(Err(WorterbuchError::NotLeader)).ok();
328        }
329        WbFunction::SPub(_, _, _, tx) => {
330            tx.send(Err(WorterbuchError::NotLeader)).ok();
331        }
332        WbFunction::Publish(_, _, tx) => {
333            tx.send(Err(WorterbuchError::NotLeader)).ok();
334        }
335        WbFunction::Ls(parent, tx) => {
336            tx.send(worterbuch.ls(&parent)).ok();
337        }
338        WbFunction::PLs(parent, tx) => {
339            tx.send(worterbuch.pls(&parent)).ok();
340        }
341        WbFunction::PGet(pattern, tx) => {
342            tx.send(worterbuch.pget(&pattern)).ok();
343        }
344        WbFunction::Subscribe(client_id, transaction_id, key, unique, live_only, tx) => {
345            tx.send(
346                worterbuch
347                    .subscribe(client_id, transaction_id, key, unique, live_only)
348                    .await,
349            )
350            .ok();
351        }
352        WbFunction::PSubscribe(client_id, transaction_id, pattern, unique, live_only, tx) => {
353            tx.send(
354                worterbuch
355                    .psubscribe(client_id, transaction_id, pattern, unique, live_only)
356                    .await,
357            )
358            .ok();
359        }
360        WbFunction::SubscribeLs(client_id, transaction_id, parent, tx) => {
361            tx.send(
362                worterbuch
363                    .subscribe_ls(client_id, transaction_id, parent)
364                    .await,
365            )
366            .ok();
367        }
368        WbFunction::Unsubscribe(client_id, transaction_id, tx) => {
369            tx.send(worterbuch.unsubscribe(client_id, transaction_id).await)
370                .ok();
371        }
372        WbFunction::UnsubscribeLs(client_id, transaction_id, tx) => {
373            tx.send(worterbuch.unsubscribe_ls(client_id, transaction_id))
374                .ok();
375        }
376        WbFunction::Lock(_, _, tx) => {
377            tx.send(Err(WorterbuchError::NotLeader)).ok();
378        }
379        WbFunction::AcquireLock(_, _, tx) => {
380            tx.send(Err(WorterbuchError::NotLeader)).ok();
381        }
382        WbFunction::ReleaseLock(_, _, tx) => {
383            tx.send(Err(WorterbuchError::NotLeader)).ok();
384        }
385        WbFunction::Delete(_, _, tx) => {
386            tx.send(Err(WorterbuchError::NotLeader)).ok();
387        }
388        WbFunction::PDelete(_, _, tx) => {
389            tx.send(Err(WorterbuchError::NotLeader)).ok();
390        }
391        WbFunction::Connected(client_id, remote_addr, protocol) => {
392            worterbuch
393                .connected(client_id, remote_addr, &protocol)
394                .await;
395        }
396        WbFunction::Disconnected(client_id, remote_addr) => {
397            worterbuch.disconnected(client_id, remote_addr).await.ok();
398        }
399        WbFunction::Config(tx) => {
400            tx.send(worterbuch.config().clone()).ok();
401        }
402        WbFunction::Export(tx, span) => {
403            let _ = span.enter();
404            worterbuch.export_for_persistence(tx);
405        }
406        WbFunction::Import(_, tx) => {
407            tx.send(Err(WorterbuchError::NotLeader)).ok();
408        }
409        WbFunction::Len(tx) => {
410            tx.send(worterbuch.len()).ok();
411        }
412    }
413}
414
415async fn run_in_regular_mode(
416    subsys: SubsystemHandle,
417    mut worterbuch: Worterbuch,
418    mut api_rx: mpsc::Receiver<WbFunction>,
419    config: Config,
420    web_server: Option<ServerSubsystem>,
421    tcp_server: Option<ServerSubsystem>,
422    unix_socket: Option<ServerSubsystem>,
423) -> Result<()> {
424    loop {
425        select! {
426            recv = api_rx.recv() => match recv {
427                Some(function) => process_api_call(&mut worterbuch, function).await,
428                None => break,
429            },
430            _ = subsys.on_shutdown_requested() => break,
431        }
432    }
433
434    shutdown(
435        subsys,
436        &mut worterbuch,
437        config,
438        web_server,
439        tcp_server,
440        unix_socket,
441    )
442    .await
443}
444
445async fn run_in_leader_mode(
446    subsys: SubsystemHandle,
447    mut worterbuch: Worterbuch,
448    mut api_rx: mpsc::Receiver<WbFunction>,
449    config: Config,
450    web_server: Option<ServerSubsystem>,
451    tcp_server: Option<ServerSubsystem>,
452    unix_socket: Option<ServerSubsystem>,
453) -> Result<()> {
454    info!("Running in LEADER mode.");
455
456    shutdown_on_stdin_close(&subsys);
457
458    worterbuch
459        .set(
460            topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_MODE),
461            json!(Mode::Leader),
462            INTERNAL_CLIENT_ID,
463        )
464        .await?;
465
466    let mut client_write_txs: Vec<(usize, mpsc::Sender<ClientWriteCommand>)> = vec![];
467    let (follower_connected_tx, mut follower_connected_rx) = mpsc::channel::<
468        oneshot::Sender<(StateSync, mpsc::Receiver<ClientWriteCommand>)>,
469    >(config.channel_buffer_size);
470
471    let mut tx_id = 0;
472    let mut dead = vec![];
473
474    let cfg = config.clone();
475    subsys.start(SubsystemBuilder::new("cluster_sync_port", move |s| {
476        run_cluster_sync_port(s, cfg, follower_connected_tx)
477    }));
478
479    let (mut grave_goods_rx, _) = worterbuch
480        .psubscribe(
481            INTERNAL_CLIENT_ID,
482            0,
483            topic!(
484                SYSTEM_TOPIC_ROOT,
485                SYSTEM_TOPIC_CLIENTS,
486                KeySegment::Wildcard,
487                SYSTEM_TOPIC_GRAVE_GOODS
488            ),
489            true,
490            false,
491        )
492        .await?;
493    let (mut last_will_rx, _) = worterbuch
494        .psubscribe(
495            INTERNAL_CLIENT_ID,
496            0,
497            topic!(
498                SYSTEM_TOPIC_ROOT,
499                SYSTEM_TOPIC_CLIENTS,
500                KeySegment::Wildcard,
501                SYSTEM_TOPIC_LAST_WILL
502            ),
503            true,
504            false,
505        )
506        .await?;
507
508    loop {
509        select! {
510            recv = grave_goods_rx.recv() => if let Some(e) = recv {
511                debug!("Forwarding grave goods change: {e:?}");
512                match e {
513                    PStateEvent::KeyValuePairs(kvps) => {
514                        for kvp in kvps {
515                            let span = span!(Level::DEBUG, "forward_grave_goods");
516                            forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Set(kvp.key, kvp.value, INTERNAL_CLIENT_ID, oneshot::channel().0, span), false).await;
517                        }
518                    },
519                    PStateEvent::Deleted(kvps) => {
520                        for kvp in kvps {
521                            forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Delete(kvp.key, INTERNAL_CLIENT_ID, oneshot::channel().0), false).await;
522                        }
523                    },
524                }
525            },
526            recv = last_will_rx.recv() => if let Some(e) = recv {
527                debug!("Forwarding last will change: {e:?}");
528                match e {
529                    PStateEvent::KeyValuePairs(kvps) => {
530                        for kvp in kvps {
531                            let span = span!(Level::DEBUG, "forward_last_will");
532                            forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Set(kvp.key, kvp.value, INTERNAL_CLIENT_ID, oneshot::channel().0, span), false).await;
533                        }
534                    },
535                    PStateEvent::Deleted(kvps) => {
536                        for kvp in kvps {
537                            forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Delete(kvp.key, INTERNAL_CLIENT_ID, oneshot::channel().0), false).await;
538                        }
539                    },
540                }
541            },
542            recv = api_rx.recv() => match recv {
543                Some(WbFunction::Import(json, tx)) => {
544                    let (tx_int, rx_int) = oneshot::channel();
545                    process_api_call(&mut worterbuch, WbFunction::Import(json, tx_int)).await;
546                    let imported_values = rx_int.await.into_diagnostic()??;
547
548                    for (key, (value, changed))  in &imported_values {
549                        if *changed {
550                            let cmd = match value.to_owned() {
551                                ValueEntry::Cas(value, version) => ClientWriteCommand::CSet(key.to_owned(), value, version),
552                                ValueEntry::Plain(value) => ClientWriteCommand::Set(key.to_owned(), value),
553                            };
554                            forward_to_followers(cmd, &mut client_write_txs, &mut dead).await;
555                        }
556                    }
557                    tx.send(Ok(imported_values)).ok();
558                },
559                Some(function) => {
560                    forward_api_call(&mut client_write_txs, &mut dead, &function, true).await;
561                    process_api_call(&mut worterbuch, function).await;
562                },
563                None => break,
564            },
565            recv = follower_connected_rx.recv() => match recv {
566                Some(state_tx) => {
567                    let (client_write_tx, client_write_rx) = mpsc::channel(config.channel_buffer_size);
568                    let (current_state, grave_goods, last_will) = worterbuch.export();
569                    if state_tx.send((StateSync(current_state, grave_goods, last_will), client_write_rx)).is_ok() {
570                        client_write_txs.push((tx_id, client_write_tx));
571                        tx_id += 1;
572                    }
573                },
574                None => break,
575            },
576            _ = subsys.on_shutdown_requested() => break,
577        }
578    }
579
580    shutdown(
581        subsys,
582        &mut worterbuch,
583        config,
584        web_server,
585        tcp_server,
586        unix_socket,
587    )
588    .await
589}
590
591async fn run_in_follower_mode(
592    subsys: SubsystemHandle,
593    worterbuch: &mut Worterbuch,
594    mut api_rx: mpsc::Receiver<WbFunction>,
595    config: Config,
596    web_server: Option<ServerSubsystem>,
597) -> Result<()> {
598    let leader_addr = if let Some(it) = &config.leader_address {
599        it
600    } else {
601        return Err(miette!("No valid leader address configured."));
602    };
603    info!("Running in FOLLOWER mode. Leader: {}", leader_addr,);
604
605    shutdown_on_stdin_close(&subsys);
606
607    worterbuch
608        .set(
609            topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_MODE),
610            json!(Mode::Follower),
611            INTERNAL_CLIENT_ID,
612        )
613        .await?;
614
615    let mut persistence_interval = config.persistence_interval();
616
617    let stream = TcpStream::connect(leader_addr).await.into_diagnostic()?;
618
619    let mut lines = BufReader::new(stream).lines();
620
621    info!("Waiting for initial sync message from leader …");
622    select! {
623        recv = receive_msg(&mut lines) => match recv {
624            Ok(Some(msg)) => {
625                if let LeaderSyncMessage::Init(state) = msg {
626                    initial_sync(state, worterbuch).await?;
627                    PERSISTENCE_LOCKED.store(false, Ordering::Release);
628                    persistence_interval.reset();
629                    persistence::synchronous(worterbuch, &config).await?;
630                } else {
631                    return Err(miette!("first message from leader is supposed to be the initial sync, but it wasn't"));
632                }
633            },
634            Ok(None) => return Err(miette!("connection to leader closed before initial sync")),
635            Err(e) => {
636                return Err(miette!("error receiving update from leader: {e}"));
637            }
638        },
639        _ = subsys.on_shutdown_requested() => return Err(miette!("shut down before initial sync")),
640    }
641    info!("Successfully synced with leader.");
642
643    loop {
644        select! {
645            recv = receive_msg(&mut lines) => match recv {
646                Ok(Some(msg)) => process_leader_message(msg, worterbuch).await?,
647                Ok(None) => break,
648                Err(e) => {
649                    error!("Error receiving update from leader: {e}");
650                    break;
651                }
652            },
653            recv = api_rx.recv() => match recv {
654                Some(function) => process_api_call_as_follower(worterbuch, function).await,
655                None => break,
656            },
657            _ = persistence_interval.tick() => {
658                debug!("Follower persistence interval triggered");
659                persistence::synchronous(worterbuch, &config).await?;
660            },
661            _ = subsys.on_shutdown_requested() => break,
662        }
663    }
664
665    shutdown(subsys, worterbuch, config, web_server, None, None).await
666}
667
668async fn shutdown(
669    subsys: SubsystemHandle,
670    worterbuch: &mut Worterbuch,
671    config: Config,
672    web_server: Option<ServerSubsystem>,
673    tcp_server: Option<ServerSubsystem>,
674    unix_socket: Option<ServerSubsystem>,
675) -> Result<()> {
676    info!("Shutdown sequence triggered");
677
678    subsys.request_shutdown();
679
680    shutdown_servers(web_server, tcp_server, unix_socket).await;
681
682    if config.use_persistence {
683        info!("Applying grave goods and last wills …");
684        worterbuch.apply_all_grave_goods_and_last_wills().await;
685        info!("Waiting for persistence hook to complete …");
686        persistence::synchronous(worterbuch, &config).await?;
687        info!("Shutdown persistence hook complete.");
688    }
689
690    Ok(())
691}
692
693async fn shutdown_servers(
694    web_server: Option<ServerSubsystem>,
695    tcp_server: Option<ServerSubsystem>,
696    unix_socket: Option<ServerSubsystem>,
697) {
698    if let Some(it) = web_server {
699        info!("Shutting down web server …");
700        it.initiate_shutdown();
701        if let Err(e) = it.join().await {
702            error!("Error waiting for web server to shut down: {e}");
703        }
704    }
705
706    if let Some(it) = tcp_server {
707        info!("Shutting down tcp server …");
708        it.initiate_shutdown();
709        if let Err(e) = it.join().await {
710            error!("Error waiting for tcp server to shut down: {e}");
711        }
712    }
713
714    if let Some(it) = unix_socket {
715        info!("Shutting down unix socket …");
716        it.initiate_shutdown();
717        if let Err(e) = it.join().await {
718            error!("Error waiting for unix socket to shut down: {e}");
719        }
720    }
721}
722
723async fn forward_api_call(
724    client_write_txs: &mut Vec<(usize, mpsc::Sender<ClientWriteCommand>)>,
725    dead: &mut Vec<usize>,
726    function: &WbFunction,
727    filter_sys: bool,
728) {
729    if let Some(cmd) = match function {
730        WbFunction::Get(_, _)
731        | WbFunction::CGet(_, _)
732        | WbFunction::SPubInit(_, _, _, _)
733        | WbFunction::SPub(_, _, _, _)
734        | WbFunction::Publish(_, _, _)
735        | WbFunction::Ls(_, _)
736        | WbFunction::PLs(_, _)
737        | WbFunction::PGet(_, _)
738        | WbFunction::Subscribe(_, _, _, _, _, _)
739        | WbFunction::PSubscribe(_, _, _, _, _, _)
740        | WbFunction::SubscribeLs(_, _, _, _)
741        | WbFunction::Unsubscribe(_, _, _)
742        | WbFunction::UnsubscribeLs(_, _, _)
743        | WbFunction::Connected(_, _, _)
744        | WbFunction::Disconnected(_, _)
745        | WbFunction::Config(_)
746        | WbFunction::Export(_, _)
747        | WbFunction::Import(_, _)
748        | WbFunction::Len(_)
749        | WbFunction::Lock(_, _, _)
750        | WbFunction::AcquireLock(_, _, _)
751        | WbFunction::ReleaseLock(_, _, _) => None,
752        WbFunction::Set(key, value, _, _, _) => {
753            if !filter_sys || !key.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
754                Some(ClientWriteCommand::Set(key.to_owned(), value.to_owned()))
755            } else {
756                None
757            }
758        }
759        WbFunction::CSet(key, value, version, _, _) => {
760            if !filter_sys || !key.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
761                Some(ClientWriteCommand::CSet(
762                    key.to_owned(),
763                    value.to_owned(),
764                    version.to_owned(),
765                ))
766            } else {
767                None
768            }
769        }
770        WbFunction::Delete(key, _, _) => {
771            if !filter_sys || !key.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
772                Some(ClientWriteCommand::Delete(key.to_owned()))
773            } else {
774                None
775            }
776        }
777        WbFunction::PDelete(pattern, _, _) => {
778            if !filter_sys || !pattern.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
779                Some(ClientWriteCommand::PDelete(pattern.to_owned()))
780            } else {
781                None
782            }
783        }
784    } {
785        forward_to_followers(cmd, client_write_txs, dead).await;
786    }
787}
788
789async fn forward_to_followers(
790    cmd: ClientWriteCommand,
791    client_write_txs: &mut Vec<(usize, mpsc::Sender<ClientWriteCommand>)>,
792    dead: &mut Vec<usize>,
793) {
794    for (id, tx) in client_write_txs.iter() {
795        if tx.send(cmd.clone()).await.is_err() {
796            dead.push(*id);
797        }
798    }
799    if !dead.is_empty() {
800        client_write_txs.retain(|(i, _)| !dead.contains(i));
801        dead.clear();
802    }
803}