worterbuch_client/
buffer.rs1use crate::Command;
21use std::{
22 collections::HashMap,
23 sync::{Arc, Mutex},
24 time::Duration,
25};
26use tokio::{
27 spawn,
28 sync::{mpsc, oneshot},
29 time::sleep,
30};
31use tracing::error;
32use worterbuch_common::{Key, KeyValuePair, Value, error::ConnectionResult};
33
34type Buffer = Arc<Mutex<HashMap<Key, Value>>>;
35
36const LOCK_MSG: &str = "the lock scope must not contain code that can panic!";
37
38#[derive(Clone)]
39pub struct SendBuffer {
40 delay: Duration,
41 set_tx: mpsc::Sender<KeyValuePair>,
42 publish_tx: mpsc::Sender<KeyValuePair>,
43 set_buffer: Buffer,
44 publish_buffer: Buffer,
45 commands: mpsc::Sender<Command>,
46}
47
48impl SendBuffer {
49 pub(crate) async fn new(commands: mpsc::Sender<Command>, delay: Duration) -> Self {
50 let (set_tx, set_rx) = mpsc::channel(1);
51 let (publish_tx, publish_rx) = mpsc::channel(1);
52
53 let set_buffer = Buffer::default();
54 let publish_buffer = Buffer::default();
55
56 let buf = Self {
57 delay,
58 set_tx,
59 publish_tx,
60 set_buffer,
61 publish_buffer,
62 commands,
63 };
64
65 spawn(buf.clone().buffer_set_messages(set_rx));
66 spawn(buf.clone().buffer_publish_messages(publish_rx));
67
68 buf
69 }
70
71 pub async fn set_later(&self, key: Key, value: Value) -> ConnectionResult<()> {
72 self.set_tx.send(KeyValuePair { key, value }).await?;
73 Ok(())
74 }
75
76 pub async fn publish_later(&self, key: Key, value: Value) -> ConnectionResult<()> {
77 self.publish_tx.send(KeyValuePair { key, value }).await?;
78 Ok(())
79 }
80
81 async fn buffer_set_messages(self, mut rx: mpsc::Receiver<KeyValuePair>) {
82 while let Some(KeyValuePair { key, value }) = rx.recv().await {
83 let previous = self
84 .set_buffer
85 .lock()
86 .expect(LOCK_MSG)
87 .insert(key.clone(), value);
88 if previous.is_none() {
89 spawn(self.clone().set_value(key));
90 }
91 }
92 }
93
94 async fn buffer_publish_messages(self, mut rx: mpsc::Receiver<KeyValuePair>) {
95 while let Some(KeyValuePair { key, value }) = rx.recv().await {
96 let previous = self
97 .set_buffer
98 .lock()
99 .expect(LOCK_MSG)
100 .insert(key.clone(), value);
101 if previous.is_none() {
102 spawn(self.clone().publish_value(key));
103 }
104 }
105 }
106
107 async fn set_value(self, key: Key) {
108 sleep(self.delay).await;
109 let value = self.set_buffer.lock().expect(LOCK_MSG).remove(&key);
110 if let Some(value) = value
111 && let Err(e) = self.do_set_value(key, value).await
112 {
113 error!("Error sending set message: {e}");
114 }
115 }
116
117 async fn do_set_value(&self, key: Key, value: Value) -> ConnectionResult<()> {
118 let (tx, rx) = oneshot::channel();
119 self.commands.send(Command::Set(key, value, tx)).await?;
120 rx.await.ok();
121 Ok(())
122 }
123
124 async fn publish_value(self, key: Key) {
125 sleep(self.delay).await;
126 let value = self.publish_buffer.lock().expect(LOCK_MSG).remove(&key);
127 if let Some(value) = value
128 && let Err(e) = self.do_publish_value(key, value).await
129 {
130 error!("Error sending publish message: {e}");
131 }
132 }
133
134 async fn do_publish_value(&self, key: Key, value: Value) -> ConnectionResult<()> {
135 let (tx, rx) = oneshot::channel();
136 self.commands.send(Command::Publish(key, value, tx)).await?;
137 rx.await.ok();
138 Ok(())
139 }
140}