worterbuch/
lib.rs

1/*
2 *  The worterbuch application library
3 *
4 *  Copyright (C) 2024 Michael Bachmann
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU Affero General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU Affero General Public License for more details.
15 *
16 *  You should have received a copy of the GNU Affero General Public License
17 *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20//! This library allows you to embed worterbuch into your application.
21//!
22//! Note that while it makes embedding very easy, it does leak several
23//! dependencies into your application that a proper library normally
24//! shouldn't. Worterbuch takes this liberty because it is essentailly
25//! still an application. Just one that you can start from within your
26//! own application.
27
28mod auth;
29mod config;
30pub mod error;
31mod leader_follower;
32pub(crate) mod license;
33#[cfg(not(feature = "telemetry"))]
34pub mod logging;
35mod mem_tools;
36mod persistence;
37pub mod server;
38mod stats;
39pub(crate) mod store;
40mod subscribers;
41#[cfg(feature = "telemetry")]
42pub mod telemetry;
43mod worterbuch;
44
45pub use config::*;
46pub use worterbuch_common as common;
47
48use crate::{
49    error::{WorterbuchAppError, WorterbuchAppResult},
50    persistence::unlock_persistence,
51    server::{CloneableWbApi, common::SUPPORTED_PROTOCOL_VERSIONS},
52    stats::track_stats,
53    worterbuch::Worterbuch,
54};
55use common::{
56    KeySegment, PStateEvent, SYSTEM_TOPIC_CLIENTS, SYSTEM_TOPIC_GRAVE_GOODS,
57    SYSTEM_TOPIC_LAST_WILL, SYSTEM_TOPIC_MODE, SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_ROOT_PREFIX,
58    SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSION, Value, error::WorterbuchError, receive_msg, topic,
59};
60use leader_follower::{
61    ClientWriteCommand, LeaderSyncMessage, Mode, StateSync, initial_sync, process_leader_message,
62    run_cluster_sync_port, shutdown_on_stdin_close,
63};
64use serde_json::json;
65use server::common::WbFunction;
66use std::error::Error;
67use tokio::{
68    io::{AsyncBufReadExt, BufReader},
69    net::TcpStream,
70    select,
71    sync::{mpsc, oneshot},
72};
73use tokio_graceful_shutdown::{SubsystemBuilder, SubsystemHandle};
74use tracing::{Instrument, Level, debug, error, info, span};
75use worterbuch_common::{INTERNAL_CLIENT_ID, ValueEntry};
76
77type ServerSubsystem = tokio_graceful_shutdown::NestedSubsystem<Box<dyn Error + Send + Sync>>;
78
79pub async fn spawn_worterbuch(
80    subsys: &SubsystemHandle,
81    config: Config,
82) -> WorterbuchAppResult<CloneableWbApi> {
83    let (api_tx, api_rx) = oneshot::channel();
84    subsys.start(SubsystemBuilder::new(
85        "worterbuch",
86        async |s: &mut SubsystemHandle| do_run_worterbuch(s, config, Some(api_tx)).await,
87    ));
88    Ok(api_rx.await?)
89}
90
91pub async fn run_worterbuch(
92    subsys: &mut SubsystemHandle,
93    config: Config,
94) -> WorterbuchAppResult<()> {
95    do_run_worterbuch(subsys, config, None).await?;
96    Ok(())
97}
98
99async fn do_run_worterbuch(
100    subsys: &mut SubsystemHandle,
101    config: Config,
102    tx: Option<oneshot::Sender<CloneableWbApi>>,
103) -> WorterbuchAppResult<()> {
104    let channel_buffer_size = config.channel_buffer_size;
105    let (api_tx, api_rx) = mpsc::channel(channel_buffer_size);
106    let api = CloneableWbApi::new(api_tx, config.clone());
107
108    if let Some(tx) = tx {
109        tx.send(api.clone()).ok();
110    }
111
112    let mut worterbuch = persistence::restore(subsys, &config, &api).await?;
113
114    let web_server = if let Some(WsEndpoint {
115        endpoint: Endpoint {
116            tls,
117            bind_addr,
118            port,
119        },
120        public_addr,
121    }) = &config.ws_endpoint
122    {
123        let sapi = api.clone();
124        let tls = tls.to_owned();
125        let bind_addr = bind_addr.to_owned();
126        let port = port.to_owned();
127        let public_addr = public_addr.to_owned();
128        let ws_enabled = !config.follower;
129        Some(subsys.start(SubsystemBuilder::new(
130            "webserver",
131            async move |subsys: &mut SubsystemHandle| {
132                server::axum::start(sapi, tls, bind_addr, port, public_addr, subsys, ws_enabled)
133                    .await
134            },
135        )))
136    } else {
137        None
138    };
139
140    if config.follower {
141        run_in_follower_mode(subsys, worterbuch, api_rx, config, web_server).await?;
142    } else {
143        worterbuch
144            .set(
145                topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_SUPPORTED_PROTOCOL_VERSION),
146                serde_json::to_value(SUPPORTED_PROTOCOL_VERSIONS)
147                    .unwrap_or_else(|e| Value::String(format!("Error serializing version: {e}"))),
148                INTERNAL_CLIENT_ID,
149            )
150            .await?;
151
152        let worterbuch_uptime = api.clone();
153        subsys.start(SubsystemBuilder::new(
154            "stats",
155            async |subsys: &mut SubsystemHandle| track_stats(worterbuch_uptime, subsys).await,
156        ));
157
158        let cfg = config.clone();
159        let tcp_server = if let Some(Endpoint {
160            tls: _,
161            bind_addr,
162            port,
163        }) = &config.tcp_endpoint
164        {
165            let sapi = api.clone();
166            let bind_addr = bind_addr.to_owned();
167            let port = port.to_owned();
168            Some(subsys.start(SubsystemBuilder::new(
169                "tcpserver",
170                async move |subsys: &mut SubsystemHandle| {
171                    server::tcp::start(sapi, cfg, bind_addr, port, subsys).await
172                },
173            )))
174        } else {
175            None
176        };
177
178        #[cfg(target_family = "unix")]
179        let unix_socket = if let Some(UnixEndpoint { path }) = &config.unix_endpoint {
180            let sapi = api.clone();
181            let path = path.clone();
182            Some(subsys.start(SubsystemBuilder::new(
183                "unixsocket",
184                async move |subsys: &mut SubsystemHandle| {
185                    server::unix::start(sapi, path, subsys).await
186                },
187            )))
188        } else {
189            None
190        };
191
192        #[cfg(not(target_family = "unix"))]
193        let unix_socket = None;
194
195        if config.leader {
196            run_in_leader_mode(
197                subsys,
198                worterbuch,
199                api_rx,
200                config,
201                web_server,
202                tcp_server,
203                unix_socket,
204            )
205            .await?;
206        } else {
207            run_in_regular_mode(
208                subsys,
209                worterbuch,
210                api_rx,
211                config,
212                web_server,
213                tcp_server,
214                unix_socket,
215            )
216            .await?;
217        }
218    }
219
220    debug!("worterbuch subsystem completed.");
221
222    Ok(())
223}
224
225async fn process_api_call(worterbuch: &mut Worterbuch, function: WbFunction) {
226    match function {
227        WbFunction::Get(key, tx) => {
228            tx.send(worterbuch.get(&key)).ok();
229        }
230        WbFunction::CGet(key, tx) => {
231            tx.send(worterbuch.cget(&key)).ok();
232        }
233        WbFunction::Set(key, value, client_id, tx, span) => {
234            tx.send(worterbuch.set(key, value, client_id).instrument(span).await)
235                .ok();
236        }
237        WbFunction::CSet(key, value, version, client_id, tx) => {
238            tx.send(worterbuch.cset(key, value, version, client_id).await)
239                .ok();
240        }
241        WbFunction::SPubInit(transaction_id, key, client_id, tx) => {
242            tx.send(worterbuch.spub_init(transaction_id, key, client_id).await)
243                .ok();
244        }
245        WbFunction::SPub(transaction_id, value, client_id, tx) => {
246            tx.send(worterbuch.spub(transaction_id, value, client_id).await)
247                .ok();
248        }
249        WbFunction::Publish(key, value, tx) => {
250            tx.send(worterbuch.publish(key, value).await).ok();
251        }
252        WbFunction::Ls(parent, tx) => {
253            tx.send(worterbuch.ls(&parent)).ok();
254        }
255        WbFunction::PLs(parent, tx) => {
256            tx.send(worterbuch.pls(&parent)).ok();
257        }
258        WbFunction::PGet(pattern, tx) => {
259            tx.send(worterbuch.pget(&pattern)).ok();
260        }
261        WbFunction::Subscribe(client_id, transaction_id, key, unique, live_only, tx) => {
262            tx.send(
263                worterbuch
264                    .subscribe(client_id, transaction_id, key, unique, live_only)
265                    .await,
266            )
267            .ok();
268        }
269        WbFunction::PSubscribe(client_id, transaction_id, pattern, unique, live_only, tx) => {
270            tx.send(
271                worterbuch
272                    .psubscribe(client_id, transaction_id, pattern, unique, live_only)
273                    .await,
274            )
275            .ok();
276        }
277        WbFunction::SubscribeLs(client_id, transaction_id, parent, tx) => {
278            tx.send(
279                worterbuch
280                    .subscribe_ls(client_id, transaction_id, parent)
281                    .await,
282            )
283            .ok();
284        }
285        WbFunction::Unsubscribe(client_id, transaction_id, tx) => {
286            tx.send(worterbuch.unsubscribe(client_id, transaction_id).await)
287                .ok();
288        }
289        WbFunction::UnsubscribeLs(client_id, transaction_id, tx) => {
290            tx.send(worterbuch.unsubscribe_ls(client_id, transaction_id))
291                .ok();
292        }
293        WbFunction::Delete(key, client_id, tx) => {
294            tx.send(worterbuch.delete(key, client_id).await).ok();
295        }
296        WbFunction::PDelete(pattern, client_id, tx) => {
297            tx.send(worterbuch.pdelete(pattern, client_id).await).ok();
298        }
299        WbFunction::Lock(key, client_id, tx) => {
300            tx.send(worterbuch.lock(key, client_id).await).ok();
301        }
302        WbFunction::AcquireLock(key, client_id, tx) => {
303            tx.send(worterbuch.acquire_lock(key, client_id).await).ok();
304        }
305        WbFunction::ReleaseLock(key, client_id, tx) => {
306            tx.send(worterbuch.release_lock(key, client_id).await).ok();
307        }
308        WbFunction::Connected(client_id, remote_addr, protocol, tx) => {
309            let res = worterbuch
310                .connected(client_id, remote_addr, &protocol)
311                .await;
312            tx.send(res).ok();
313        }
314        WbFunction::ProtocolSwitched(client_id, protocol) => {
315            worterbuch.protocol_switched(client_id, protocol).await;
316        }
317        WbFunction::Disconnected(client_id, remote_addr) => {
318            worterbuch.disconnected(client_id, remote_addr).await.ok();
319        }
320        WbFunction::Config(tx) => {
321            tx.send(worterbuch.config().clone()).ok();
322        }
323        WbFunction::Export(tx, span) => {
324            let g = span.enter();
325            worterbuch.export_for_persistence(tx);
326            drop(g);
327            drop(span);
328        }
329        WbFunction::Import(json, tx) => {
330            tx.send(worterbuch.import(&json).await).ok();
331        }
332        WbFunction::Len(tx) => {
333            tx.send(worterbuch.len()).ok();
334        }
335    }
336}
337
338async fn process_api_call_as_follower(worterbuch: &mut Worterbuch, function: WbFunction) {
339    match function {
340        WbFunction::Get(key, tx) => {
341            tx.send(worterbuch.get(&key)).ok();
342        }
343        WbFunction::CGet(key, tx) => {
344            tx.send(worterbuch.cget(&key)).ok();
345        }
346        WbFunction::Set(_, _, _, tx, _) => {
347            tx.send(Err(WorterbuchError::NotLeader)).ok();
348        }
349        WbFunction::CSet(_, _, _, _, tx) => {
350            tx.send(Err(WorterbuchError::NotLeader)).ok();
351        }
352        WbFunction::SPubInit(_, _, _, tx) => {
353            tx.send(Err(WorterbuchError::NotLeader)).ok();
354        }
355        WbFunction::SPub(_, _, _, tx) => {
356            tx.send(Err(WorterbuchError::NotLeader)).ok();
357        }
358        WbFunction::Publish(_, _, tx) => {
359            tx.send(Err(WorterbuchError::NotLeader)).ok();
360        }
361        WbFunction::Ls(parent, tx) => {
362            tx.send(worterbuch.ls(&parent)).ok();
363        }
364        WbFunction::PLs(parent, tx) => {
365            tx.send(worterbuch.pls(&parent)).ok();
366        }
367        WbFunction::PGet(pattern, tx) => {
368            tx.send(worterbuch.pget(&pattern)).ok();
369        }
370        WbFunction::Subscribe(client_id, transaction_id, key, unique, live_only, tx) => {
371            tx.send(
372                worterbuch
373                    .subscribe(client_id, transaction_id, key, unique, live_only)
374                    .await,
375            )
376            .ok();
377        }
378        WbFunction::PSubscribe(client_id, transaction_id, pattern, unique, live_only, tx) => {
379            tx.send(
380                worterbuch
381                    .psubscribe(client_id, transaction_id, pattern, unique, live_only)
382                    .await,
383            )
384            .ok();
385        }
386        WbFunction::SubscribeLs(client_id, transaction_id, parent, tx) => {
387            tx.send(
388                worterbuch
389                    .subscribe_ls(client_id, transaction_id, parent)
390                    .await,
391            )
392            .ok();
393        }
394        WbFunction::Unsubscribe(client_id, transaction_id, tx) => {
395            tx.send(worterbuch.unsubscribe(client_id, transaction_id).await)
396                .ok();
397        }
398        WbFunction::UnsubscribeLs(client_id, transaction_id, tx) => {
399            tx.send(worterbuch.unsubscribe_ls(client_id, transaction_id))
400                .ok();
401        }
402        WbFunction::Lock(_, _, tx) => {
403            tx.send(Err(WorterbuchError::NotLeader)).ok();
404        }
405        WbFunction::AcquireLock(_, _, tx) => {
406            tx.send(Err(WorterbuchError::NotLeader)).ok();
407        }
408        WbFunction::ReleaseLock(_, _, tx) => {
409            tx.send(Err(WorterbuchError::NotLeader)).ok();
410        }
411        WbFunction::Delete(_, _, tx) => {
412            tx.send(Err(WorterbuchError::NotLeader)).ok();
413        }
414        WbFunction::PDelete(_, _, tx) => {
415            tx.send(Err(WorterbuchError::NotLeader)).ok();
416        }
417        WbFunction::Connected(client_id, remote_addr, protocol, tx) => {
418            let res = worterbuch
419                .connected(client_id, remote_addr, &protocol)
420                .await;
421            tx.send(res).ok();
422        }
423        WbFunction::ProtocolSwitched(client_id, protocol) => {
424            worterbuch.protocol_switched(client_id, protocol).await;
425        }
426        WbFunction::Disconnected(client_id, remote_addr) => {
427            worterbuch.disconnected(client_id, remote_addr).await.ok();
428        }
429        WbFunction::Config(tx) => {
430            tx.send(worterbuch.config().clone()).ok();
431        }
432        WbFunction::Export(tx, span) => {
433            _ = span.enter();
434            worterbuch.export_for_persistence(tx);
435        }
436        WbFunction::Import(_, tx) => {
437            tx.send(Err(WorterbuchError::NotLeader)).ok();
438        }
439        WbFunction::Len(tx) => {
440            tx.send(worterbuch.len()).ok();
441        }
442    }
443}
444
445async fn run_in_regular_mode(
446    subsys: &SubsystemHandle,
447    mut worterbuch: Worterbuch,
448    mut api_rx: mpsc::Receiver<WbFunction>,
449    config: Config,
450    web_server: Option<ServerSubsystem>,
451    tcp_server: Option<ServerSubsystem>,
452    unix_socket: Option<ServerSubsystem>,
453) -> WorterbuchAppResult<()> {
454    loop {
455        select! {
456            recv = api_rx.recv() => match recv {
457                Some(function) => process_api_call(&mut worterbuch, function).await,
458                None => break,
459            },
460            _ = subsys.on_shutdown_requested() => break,
461        }
462    }
463
464    shutdown(
465        subsys,
466        worterbuch,
467        config,
468        web_server,
469        tcp_server,
470        unix_socket,
471    )
472    .await
473}
474
475async fn run_in_leader_mode(
476    subsys: &SubsystemHandle,
477    mut worterbuch: Worterbuch,
478    mut api_rx: mpsc::Receiver<WbFunction>,
479    config: Config,
480    web_server: Option<ServerSubsystem>,
481    tcp_server: Option<ServerSubsystem>,
482    unix_socket: Option<ServerSubsystem>,
483) -> WorterbuchAppResult<()> {
484    info!("Running in LEADER mode.");
485
486    shutdown_on_stdin_close(subsys);
487
488    worterbuch
489        .set(
490            topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_MODE),
491            json!(Mode::Leader),
492            INTERNAL_CLIENT_ID,
493        )
494        .await?;
495
496    let mut client_write_txs: Vec<(usize, mpsc::Sender<ClientWriteCommand>)> = vec![];
497    let (follower_connected_tx, mut follower_connected_rx) = mpsc::channel::<
498        oneshot::Sender<(StateSync, mpsc::Receiver<ClientWriteCommand>)>,
499    >(config.channel_buffer_size);
500
501    let mut tx_id = 0;
502    let mut dead = vec![];
503
504    let cfg = config.clone();
505    subsys.start(SubsystemBuilder::new(
506        "cluster_sync_port",
507        async move |s: &mut SubsystemHandle| {
508            run_cluster_sync_port(s, cfg, follower_connected_tx).await
509        },
510    ));
511
512    let (mut grave_goods_rx, _) = worterbuch
513        .psubscribe(
514            INTERNAL_CLIENT_ID,
515            0,
516            topic!(
517                SYSTEM_TOPIC_ROOT,
518                SYSTEM_TOPIC_CLIENTS,
519                KeySegment::Wildcard,
520                SYSTEM_TOPIC_GRAVE_GOODS
521            ),
522            true,
523            false,
524        )
525        .await?;
526    let (mut last_will_rx, _) = worterbuch
527        .psubscribe(
528            INTERNAL_CLIENT_ID,
529            0,
530            topic!(
531                SYSTEM_TOPIC_ROOT,
532                SYSTEM_TOPIC_CLIENTS,
533                KeySegment::Wildcard,
534                SYSTEM_TOPIC_LAST_WILL
535            ),
536            true,
537            false,
538        )
539        .await?;
540
541    loop {
542        select! {
543            recv = grave_goods_rx.recv() => if let Some(e) = recv {
544                debug!("Forwarding grave goods change: {e:?}");
545                match e {
546                    PStateEvent::KeyValuePairs(kvps) => {
547                        for kvp in kvps {
548                            let span = span!(Level::DEBUG, "forward_grave_goods");
549                            forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Set(kvp.key, kvp.value, INTERNAL_CLIENT_ID, oneshot::channel().0, span), false).await;
550                        }
551                    },
552                    PStateEvent::Deleted(kvps) => {
553                        for kvp in kvps {
554                            forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Delete(kvp.key, INTERNAL_CLIENT_ID, oneshot::channel().0), false).await;
555                        }
556                    },
557                }
558            },
559            recv = last_will_rx.recv() => if let Some(e) = recv {
560                debug!("Forwarding last will change: {e:?}");
561                match e {
562                    PStateEvent::KeyValuePairs(kvps) => {
563                        for kvp in kvps {
564                            let span = span!(Level::DEBUG, "forward_last_will");
565                            forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Set(kvp.key, kvp.value, INTERNAL_CLIENT_ID, oneshot::channel().0, span), false).await;
566                        }
567                    },
568                    PStateEvent::Deleted(kvps) => {
569                        for kvp in kvps {
570                            forward_api_call(&mut client_write_txs, &mut dead, &WbFunction::Delete(kvp.key, INTERNAL_CLIENT_ID, oneshot::channel().0), false).await;
571                        }
572                    },
573                }
574            },
575            recv = api_rx.recv() => match recv {
576                Some(WbFunction::Import(json, tx)) => {
577                    let (tx_int, rx_int) = oneshot::channel();
578                    process_api_call(&mut worterbuch, WbFunction::Import(json, tx_int)).await;
579                    let imported_values = rx_int.await??;
580
581                    for (key, (value, changed))  in &imported_values {
582                        if *changed {
583                            let cmd = match value.to_owned() {
584                                ValueEntry::Cas(value, version) => ClientWriteCommand::CSet(key.to_owned(), value, version),
585                                ValueEntry::Plain(value) => ClientWriteCommand::Set(key.to_owned(), value),
586                            };
587                            forward_to_followers(cmd, &mut client_write_txs, &mut dead).await;
588                        }
589                    }
590                    tx.send(Ok(imported_values)).ok();
591                },
592                Some(function) => {
593                    forward_api_call(&mut client_write_txs, &mut dead, &function, true).await;
594                    process_api_call(&mut worterbuch, function).await;
595                },
596                None => break,
597            },
598            recv = follower_connected_rx.recv() => match recv {
599                Some(state_tx) => {
600                    let (client_write_tx, client_write_rx) = mpsc::channel(config.channel_buffer_size);
601                    let (current_state, grave_goods, last_will) = worterbuch.export();
602                    if state_tx.send((StateSync(current_state, grave_goods, last_will), client_write_rx)).is_ok() {
603                        client_write_txs.push((tx_id, client_write_tx));
604                        tx_id += 1;
605                    }
606                },
607                None => break,
608            },
609            _ = subsys.on_shutdown_requested() => break,
610        }
611    }
612
613    shutdown(
614        subsys,
615        worterbuch,
616        config,
617        web_server,
618        tcp_server,
619        unix_socket,
620    )
621    .await
622}
623
624async fn run_in_follower_mode(
625    subsys: &SubsystemHandle,
626    mut worterbuch: Worterbuch,
627    mut api_rx: mpsc::Receiver<WbFunction>,
628    config: Config,
629    web_server: Option<ServerSubsystem>,
630) -> WorterbuchAppResult<()> {
631    let leader_addr = if let Some(it) = &config.leader_address {
632        it
633    } else {
634        return Err(WorterbuchAppError::ConfigError(
635            "No valid leader address configured.".to_owned(),
636        ));
637    };
638    info!("Running in FOLLOWER mode. Leader: {}", leader_addr,);
639
640    shutdown_on_stdin_close(subsys);
641
642    worterbuch
643        .set(
644            topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_MODE),
645            json!(Mode::Follower),
646            INTERNAL_CLIENT_ID,
647        )
648        .await?;
649
650    let mut persistence_interval = config.persistence_interval();
651
652    let stream = TcpStream::connect(leader_addr).await?;
653
654    let mut lines = BufReader::new(stream).lines();
655
656    info!("Waiting for initial sync message from leader …");
657    select! {
658        recv = receive_msg(&mut lines) => match recv {
659            Ok(Some(msg)) => {
660                if let LeaderSyncMessage::Init(state) = msg {
661                    initial_sync(state, &mut worterbuch).await?;
662                    unlock_persistence();
663                    persistence_interval.reset();
664                    worterbuch.flush().await?;
665                } else {
666                    return Err(WorterbuchAppError::ClusterError("first message from leader is supposed to be the initial sync, but it wasn't".to_owned()));
667                }
668            },
669            Ok(None) => return Err(WorterbuchAppError::ClusterError("connection to leader closed before initial sync".to_owned())),
670            Err(e) => {
671                return Err(WorterbuchAppError::ClusterError(format!("error receiving update from leader: {e}")));
672            }
673        },
674        _ = subsys.on_shutdown_requested() => return Err(WorterbuchAppError::ClusterError("shut down before initial sync".to_owned())),
675    }
676    info!("Successfully synced with leader.");
677
678    loop {
679        select! {
680            recv = receive_msg(&mut lines) => match recv {
681                Ok(Some(msg)) => process_leader_message(msg, &mut worterbuch).await?,
682                Ok(None) => break,
683                Err(e) => {
684                    error!("Error receiving update from leader: {e}");
685                    break;
686                }
687            },
688            recv = api_rx.recv() => match recv {
689                Some(function) => process_api_call_as_follower(&mut worterbuch, function).await,
690                None => break,
691            },
692            _ = persistence_interval.tick() => {
693                debug!("Follower persistence interval triggered");
694                worterbuch.flush().await?;
695            },
696            _ = subsys.on_shutdown_requested() => break,
697        }
698    }
699
700    shutdown(subsys, worterbuch, config, web_server, None, None).await
701}
702
703async fn shutdown(
704    subsys: &SubsystemHandle,
705    mut worterbuch: Worterbuch,
706    config: Config,
707    web_server: Option<ServerSubsystem>,
708    tcp_server: Option<ServerSubsystem>,
709    unix_socket: Option<ServerSubsystem>,
710) -> WorterbuchAppResult<()> {
711    info!("Shutdown sequence triggered");
712
713    subsys.request_shutdown();
714
715    shutdown_servers(web_server, tcp_server, unix_socket).await;
716
717    if config.use_persistence {
718        info!("Applying grave goods and last wills …");
719        worterbuch.apply_all_grave_goods_and_last_wills().await;
720        info!("Waiting for persistence hook to complete …");
721        worterbuch.flush().await?;
722        info!("Shutdown persistence hook complete.");
723    }
724
725    Ok(())
726}
727
728async fn shutdown_servers(
729    web_server: Option<ServerSubsystem>,
730    tcp_server: Option<ServerSubsystem>,
731    unix_socket: Option<ServerSubsystem>,
732) {
733    if let Some(it) = web_server {
734        info!("Shutting down web server …");
735        it.initiate_shutdown();
736        if let Err(e) = it.join().await {
737            error!("Error waiting for web server to shut down: {e}");
738        }
739    }
740
741    if let Some(it) = tcp_server {
742        info!("Shutting down tcp server …");
743        it.initiate_shutdown();
744        if let Err(e) = it.join().await {
745            error!("Error waiting for tcp server to shut down: {e}");
746        }
747    }
748
749    if let Some(it) = unix_socket {
750        info!("Shutting down unix socket …");
751        it.initiate_shutdown();
752        if let Err(e) = it.join().await {
753            error!("Error waiting for unix socket to shut down: {e}");
754        }
755    }
756}
757
758async fn forward_api_call(
759    client_write_txs: &mut Vec<(usize, mpsc::Sender<ClientWriteCommand>)>,
760    dead: &mut Vec<usize>,
761    function: &WbFunction,
762    filter_sys: bool,
763) {
764    if let Some(cmd) = match function {
765        WbFunction::Get(_, _)
766        | WbFunction::CGet(_, _)
767        | WbFunction::SPubInit(_, _, _, _)
768        | WbFunction::SPub(_, _, _, _)
769        | WbFunction::Publish(_, _, _)
770        | WbFunction::Ls(_, _)
771        | WbFunction::PLs(_, _)
772        | WbFunction::PGet(_, _)
773        | WbFunction::Subscribe(_, _, _, _, _, _)
774        | WbFunction::PSubscribe(_, _, _, _, _, _)
775        | WbFunction::SubscribeLs(_, _, _, _)
776        | WbFunction::Unsubscribe(_, _, _)
777        | WbFunction::UnsubscribeLs(_, _, _)
778        | WbFunction::Connected(_, _, _, _)
779        | WbFunction::ProtocolSwitched(_, _)
780        | WbFunction::Disconnected(_, _)
781        | WbFunction::Config(_)
782        | WbFunction::Export(_, _)
783        | WbFunction::Import(_, _)
784        | WbFunction::Len(_)
785        | WbFunction::Lock(_, _, _)
786        | WbFunction::AcquireLock(_, _, _)
787        | WbFunction::ReleaseLock(_, _, _) => None,
788        WbFunction::Set(key, value, _, _, _) => {
789            if !filter_sys || !key.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
790                Some(ClientWriteCommand::Set(key.to_owned(), value.to_owned()))
791            } else {
792                None
793            }
794        }
795        WbFunction::CSet(key, value, version, _, _) => {
796            if !filter_sys || !key.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
797                Some(ClientWriteCommand::CSet(
798                    key.to_owned(),
799                    value.to_owned(),
800                    version.to_owned(),
801                ))
802            } else {
803                None
804            }
805        }
806        WbFunction::Delete(key, _, _) => {
807            if !filter_sys || !key.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
808                Some(ClientWriteCommand::Delete(key.to_owned()))
809            } else {
810                None
811            }
812        }
813        WbFunction::PDelete(pattern, _, _) => {
814            if !filter_sys || !pattern.starts_with(SYSTEM_TOPIC_ROOT_PREFIX) {
815                Some(ClientWriteCommand::PDelete(pattern.to_owned()))
816            } else {
817                None
818            }
819        }
820    } {
821        forward_to_followers(cmd, client_write_txs, dead).await;
822    }
823}
824
825async fn forward_to_followers(
826    cmd: ClientWriteCommand,
827    client_write_txs: &mut Vec<(usize, mpsc::Sender<ClientWriteCommand>)>,
828    dead: &mut Vec<usize>,
829) {
830    for (id, tx) in client_write_txs.iter() {
831        if tx.send(cmd.clone()).await.is_err() {
832            dead.push(*id);
833        }
834    }
835    if !dead.is_empty() {
836        client_write_txs.retain(|(i, _)| !dead.contains(i));
837        dead.clear();
838    }
839}