cozo_ce/runtime/
callback.rs

1/*
2 * Copyright 2022, The Cozo Project Authors.
3 *
4 * This Source Code Form is subject to the terms of the Mozilla Public License, v. 2.0.
5 * If a copy of the MPL was not distributed with this file,
6 * You can obtain one at https://mozilla.org/MPL/2.0/.
7 */
8
9use std::collections::{BTreeMap, BTreeSet};
10use std::fmt::{Display, Formatter};
11
12use crossbeam::channel::Sender;
13use smartstring::{LazyCompact, SmartString};
14
15use crate::{Db, NamedRows, Storage};
16
17/// Represents the kind of operation that triggered the callback
18#[derive(Copy, Clone, Debug, Eq, PartialEq)]
19pub enum CallbackOp {
20    /// Triggered by Put operations
21    Put,
22    /// Triggered by Rm operations
23    Rm,
24}
25
26impl Display for CallbackOp {
27    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
28        match self {
29            CallbackOp::Put => f.write_str("Put"),
30            CallbackOp::Rm => f.write_str("Rm"),
31        }
32    }
33}
34
35impl CallbackOp {
36    /// Get the string representation
37    pub fn as_str(&self) -> &'static str {
38        match self {
39            CallbackOp::Put => "Put",
40            CallbackOp::Rm => "Rm",
41        }
42    }
43}
44
45#[allow(dead_code)]
46pub struct CallbackDeclaration {
47    pub(crate) dependent: SmartString<LazyCompact>,
48    pub(crate) sender: Sender<(CallbackOp, NamedRows, NamedRows)>,
49}
50
51pub(crate) type CallbackCollector =
52    BTreeMap<SmartString<LazyCompact>, Vec<(CallbackOp, NamedRows, NamedRows)>>;
53
54#[allow(dead_code)]
55pub(crate) type EventCallbackRegistry = (
56    BTreeMap<u32, CallbackDeclaration>,
57    BTreeMap<SmartString<LazyCompact>, BTreeSet<u32>>,
58);
59
60impl<'s, S: Storage<'s>> Db<S> {
61    pub(crate) fn current_callback_targets(&self) -> BTreeSet<SmartString<LazyCompact>> {
62        #[cfg(not(target_arch = "wasm32"))]
63        {
64            self.event_callbacks
65                .read()
66                .unwrap()
67                .1
68                .keys()
69                .cloned()
70                .collect()
71        }
72
73        #[cfg(target_arch = "wasm32")]
74        {
75            Default::default()
76        }
77    }
78    #[cfg(not(target_arch = "wasm32"))]
79    pub(crate) fn send_callbacks(&'s self, collector: CallbackCollector) {
80        let mut to_remove = vec![];
81
82        for (table, vals) in collector {
83            for (op, new, old) in vals {
84                let (cbs, cb_dir) = &*self.event_callbacks.read().unwrap();
85                if let Some(cb_ids) = cb_dir.get(&table) {
86                    let mut it = cb_ids.iter();
87                    if let Some(fst) = it.next() {
88                        for cb_id in it {
89                            if let Some(cb) = cbs.get(cb_id) {
90                                if cb.sender.send((op, new.clone(), old.clone())).is_err() {
91                                    to_remove.push(*cb_id)
92                                }
93                            }
94                        }
95
96                        if let Some(cb) = cbs.get(fst) {
97                            if cb.sender.send((op, new, old)).is_err() {
98                                to_remove.push(*fst)
99                            }
100                        }
101                    }
102                }
103            }
104        }
105
106        if !to_remove.is_empty() {
107            let (cbs, cb_dir) = &mut *self.event_callbacks.write().unwrap();
108            for removing_id in &to_remove {
109                if let Some(removed) = cbs.remove(removing_id) {
110                    if let Some(set) = cb_dir.get_mut(&removed.dependent) {
111                        set.remove(removing_id);
112                    }
113                }
114            }
115        }
116    }
117}