apalis_sqlite/
callback.rs1use futures::channel::mpsc::{self, UnboundedReceiver};
2use futures::{Stream, StreamExt};
3use std::ffi::{CStr, c_void};
4use std::os::raw::{c_char, c_int};
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8#[derive(Debug)]
9pub struct DbEvent {
10 op: &'static str,
11 db_name: String,
12 table_name: String,
13 rowid: i64,
14}
15
16impl DbEvent {
17 pub fn operation(&self) -> &'static str {
18 self.op
19 }
20
21 pub fn db_name(&self) -> &str {
22 &self.db_name
23 }
24
25 pub fn table_name(&self) -> &str {
26 &self.table_name
27 }
28
29 pub fn rowid(&self) -> i64 {
30 self.rowid
31 }
32}
33
34pub(crate) extern "C" fn update_hook_callback(
36 arg: *mut c_void,
37 op: c_int,
38 db_name: *const c_char,
39 table_name: *const c_char,
40 rowid: i64,
41) {
42 let op_str = match op {
43 libsqlite3_sys::SQLITE_INSERT => "INSERT",
44 libsqlite3_sys::SQLITE_UPDATE => "UPDATE",
45 libsqlite3_sys::SQLITE_DELETE => "DELETE",
46 _ => "UNKNOWN",
47 };
48
49 unsafe {
50 let db = CStr::from_ptr(db_name).to_string_lossy().to_string();
51 let table = CStr::from_ptr(table_name).to_string_lossy().to_string();
52
53 let tx = &mut *(arg as *mut mpsc::UnboundedSender<DbEvent>);
55
56 let _ = tx.start_send(DbEvent {
58 op: op_str,
59 db_name: db,
60 table_name: table,
61 rowid,
62 });
63 }
64}
65
66#[derive(Debug, Clone)]
67pub struct HookCallbackListener;
68
69#[derive(Debug)]
70pub struct CallbackListener {
71 rx: UnboundedReceiver<DbEvent>,
72}
73impl CallbackListener {
74 pub fn new(rx: UnboundedReceiver<DbEvent>) -> Self {
75 Self { rx }
76 }
77}
78
79impl Stream for CallbackListener {
80 type Item = DbEvent;
81
82 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
83 self.rx.poll_next_unpin(cx)
84 }
85}