worterbuch_client/
buffer.rs

1/*
2 *  Worterbuch client send buffer module
3 *
4 *  Copyright (C) 2024 Michael Bachmann
5 *
6 *  This program is free software: you can redistribute it and/or modify
7 *  it under the terms of the GNU Affero General Public License as published by
8 *  the Free Software Foundation, either version 3 of the License, or
9 *  (at your option) any later version.
10 *
11 *  This program is distributed in the hope that it will be useful,
12 *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13 *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14 *  GNU Affero General Public License for more details.
15 *
16 *  You should have received a copy of the GNU Affero General Public License
17 *  along with this program.  If not, see <https://www.gnu.org/licenses/>.
18 */
19
20use crate::Command;
21use std::{
22    collections::HashMap,
23    sync::{Arc, Mutex},
24    time::Duration,
25};
26use tokio::{
27    spawn,
28    sync::{mpsc, oneshot},
29    time::sleep,
30};
31use tracing::error;
32use worterbuch_common::{Key, KeyValuePair, Value, error::ConnectionResult};
33
34type Buffer = Arc<Mutex<HashMap<Key, Value>>>;
35
36const LOCK_MSG: &str = "the lock scope must not contain code that can panic!";
37
38#[derive(Clone)]
39pub struct SendBuffer {
40    delay: Duration,
41    set_tx: mpsc::Sender<KeyValuePair>,
42    publish_tx: mpsc::Sender<KeyValuePair>,
43    set_buffer: Buffer,
44    publish_buffer: Buffer,
45    commands: mpsc::Sender<Command>,
46}
47
48impl SendBuffer {
49    pub(crate) async fn new(commands: mpsc::Sender<Command>, delay: Duration) -> Self {
50        let (set_tx, set_rx) = mpsc::channel(1);
51        let (publish_tx, publish_rx) = mpsc::channel(1);
52
53        let set_buffer = Buffer::default();
54        let publish_buffer = Buffer::default();
55
56        let buf = Self {
57            delay,
58            set_tx,
59            publish_tx,
60            set_buffer,
61            publish_buffer,
62            commands,
63        };
64
65        spawn(buf.clone().buffer_set_messages(set_rx));
66        spawn(buf.clone().buffer_publish_messages(publish_rx));
67
68        buf
69    }
70
71    pub async fn set_later(&self, key: Key, value: Value) -> ConnectionResult<()> {
72        self.set_tx.send(KeyValuePair { key, value }).await?;
73        Ok(())
74    }
75
76    pub async fn publish_later(&self, key: Key, value: Value) -> ConnectionResult<()> {
77        self.publish_tx.send(KeyValuePair { key, value }).await?;
78        Ok(())
79    }
80
81    async fn buffer_set_messages(self, mut rx: mpsc::Receiver<KeyValuePair>) {
82        while let Some(KeyValuePair { key, value }) = rx.recv().await {
83            let previous = self
84                .set_buffer
85                .lock()
86                .expect(LOCK_MSG)
87                .insert(key.clone(), value);
88            if previous.is_none() {
89                spawn(self.clone().set_value(key));
90            }
91        }
92    }
93
94    async fn buffer_publish_messages(self, mut rx: mpsc::Receiver<KeyValuePair>) {
95        while let Some(KeyValuePair { key, value }) = rx.recv().await {
96            let previous = self
97                .set_buffer
98                .lock()
99                .expect(LOCK_MSG)
100                .insert(key.clone(), value);
101            if previous.is_none() {
102                spawn(self.clone().publish_value(key));
103            }
104        }
105    }
106
107    async fn set_value(self, key: Key) {
108        sleep(self.delay).await;
109        let value = self.set_buffer.lock().expect(LOCK_MSG).remove(&key);
110        if let Some(value) = value
111            && let Err(e) = self.do_set_value(key, value).await
112        {
113            error!("Error sending set message: {e}");
114        }
115    }
116
117    async fn do_set_value(&self, key: Key, value: Value) -> ConnectionResult<()> {
118        let (tx, rx) = oneshot::channel();
119        self.commands.send(Command::Set(key, value, tx)).await?;
120        rx.await.ok();
121        Ok(())
122    }
123
124    async fn publish_value(self, key: Key) {
125        sleep(self.delay).await;
126        let value = self.publish_buffer.lock().expect(LOCK_MSG).remove(&key);
127        if let Some(value) = value
128            && let Err(e) = self.do_publish_value(key, value).await
129        {
130            error!("Error sending publish message: {e}");
131        }
132    }
133
134    async fn do_publish_value(&self, key: Key, value: Value) -> ConnectionResult<()> {
135        let (tx, rx) = oneshot::channel();
136        self.commands.send(Command::Publish(key, value, tx)).await?;
137        rx.await.ok();
138        Ok(())
139    }
140}