apalis_sqlite/
callback.rs1use futures::channel::mpsc::{UnboundedReceiver, UnboundedSender};
2use futures::{Stream, StreamExt};
3use sqlx::sqlite::{SqliteOperation, UpdateHookResult};
4use std::pin::Pin;
5use std::sync::{Arc, Mutex};
6use std::task::{Context, Poll};
7
8#[derive(Debug)]
10pub struct DbEvent {
11 op: SqliteOperation,
12 db_name: String,
13 table_name: String,
14 rowid: i64,
15}
16
17impl DbEvent {
18 #[must_use]
20 pub fn operation(&self) -> &SqliteOperation {
21 &self.op
22 }
23
24 #[must_use]
26 pub fn db_name(&self) -> &str {
27 &self.db_name
28 }
29
30 #[must_use]
32 pub fn table_name(&self) -> &str {
33 &self.table_name
34 }
35
36 #[must_use]
38 pub fn rowid(&self) -> i64 {
39 self.rowid
40 }
41}
42
43pub(crate) fn update_hook_callback(event: UpdateHookResult<'_>, tx: &mut UnboundedSender<DbEvent>) {
45 tx.start_send(DbEvent {
46 op: event.operation,
47 db_name: event.database.to_owned(),
48 table_name: event.table.to_owned(),
49 rowid: event.rowid,
50 })
51 .unwrap();
52}
53
54#[derive(Debug, Clone)]
56pub struct HookCallbackListener {
57 rx: Arc<Mutex<UnboundedReceiver<DbEvent>>>,
58}
59
60impl HookCallbackListener {
61 #[must_use]
63 pub fn new(rx: UnboundedReceiver<DbEvent>) -> Self {
64 Self {
65 rx: Arc::new(Mutex::new(rx)),
66 }
67 }
68}
69
70impl Stream for HookCallbackListener {
71 type Item = DbEvent;
72
73 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
74 self.rx.lock().unwrap().poll_next_unpin(cx)
75 }
76}