cozo_ce/runtime/
callback.rs1use std::collections::{BTreeMap, BTreeSet};
10use std::fmt::{Display, Formatter};
11
12use crossbeam::channel::Sender;
13use smartstring::{LazyCompact, SmartString};
14
15use crate::{Db, NamedRows, Storage};
16
17#[derive(Copy, Clone, Debug, Eq, PartialEq)]
19pub enum CallbackOp {
20 Put,
22 Rm,
24}
25
26impl Display for CallbackOp {
27 fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
28 match self {
29 CallbackOp::Put => f.write_str("Put"),
30 CallbackOp::Rm => f.write_str("Rm"),
31 }
32 }
33}
34
35impl CallbackOp {
36 pub fn as_str(&self) -> &'static str {
38 match self {
39 CallbackOp::Put => "Put",
40 CallbackOp::Rm => "Rm",
41 }
42 }
43}
44
45#[allow(dead_code)]
46pub struct CallbackDeclaration {
47 pub(crate) dependent: SmartString<LazyCompact>,
48 pub(crate) sender: Sender<(CallbackOp, NamedRows, NamedRows)>,
49}
50
51pub(crate) type CallbackCollector =
52 BTreeMap<SmartString<LazyCompact>, Vec<(CallbackOp, NamedRows, NamedRows)>>;
53
54#[allow(dead_code)]
55pub(crate) type EventCallbackRegistry = (
56 BTreeMap<u32, CallbackDeclaration>,
57 BTreeMap<SmartString<LazyCompact>, BTreeSet<u32>>,
58);
59
60impl<'s, S: Storage<'s>> Db<S> {
61 pub(crate) fn current_callback_targets(&self) -> BTreeSet<SmartString<LazyCompact>> {
62 #[cfg(not(target_arch = "wasm32"))]
63 {
64 self.event_callbacks
65 .read()
66 .unwrap()
67 .1
68 .keys()
69 .cloned()
70 .collect()
71 }
72
73 #[cfg(target_arch = "wasm32")]
74 {
75 Default::default()
76 }
77 }
78 #[cfg(not(target_arch = "wasm32"))]
79 pub(crate) fn send_callbacks(&'s self, collector: CallbackCollector) {
80 let mut to_remove = vec![];
81
82 for (table, vals) in collector {
83 for (op, new, old) in vals {
84 let (cbs, cb_dir) = &*self.event_callbacks.read().unwrap();
85 if let Some(cb_ids) = cb_dir.get(&table) {
86 let mut it = cb_ids.iter();
87 if let Some(fst) = it.next() {
88 for cb_id in it {
89 if let Some(cb) = cbs.get(cb_id) {
90 if cb.sender.send((op, new.clone(), old.clone())).is_err() {
91 to_remove.push(*cb_id)
92 }
93 }
94 }
95
96 if let Some(cb) = cbs.get(fst) {
97 if cb.sender.send((op, new, old)).is_err() {
98 to_remove.push(*fst)
99 }
100 }
101 }
102 }
103 }
104 }
105
106 if !to_remove.is_empty() {
107 let (cbs, cb_dir) = &mut *self.event_callbacks.write().unwrap();
108 for removing_id in &to_remove {
109 if let Some(removed) = cbs.remove(removing_id) {
110 if let Some(set) = cb_dir.get_mut(&removed.dependent) {
111 set.remove(removing_id);
112 }
113 }
114 }
115 }
116 }
117}