Skip to main content

reddb_server/runtime/
impl_events.rs

1//! Event operator execution.
2
3use std::collections::HashSet;
4
5use super::*;
6
7impl RedDBRuntime {
8    pub fn execute_events_backfill(
9        &self,
10        raw_query: &str,
11        query: &EventsBackfillQuery,
12    ) -> RedDBResult<RuntimeQueryResult> {
13        self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
14
15        let contract = self
16            .db()
17            .collection_contract_arc(&query.collection)
18            .ok_or_else(|| RedDBError::NotFound(query.collection.clone()))?;
19        if contract.declared_model == crate::catalog::CollectionModel::Queue {
20            return Err(RedDBError::Query(
21                "queues cannot be event backfill sources".to_string(),
22            ));
23        }
24
25        let subscription = contract
26            .subscriptions
27            .iter()
28            .find(|sub| sub.enabled && sub.target_queue == query.target_queue)
29            .cloned()
30            .ok_or_else(|| {
31                RedDBError::Query(format!(
32                    "no enabled event subscription from '{}' to '{}'",
33                    query.collection, query.target_queue
34                ))
35            })?;
36
37        let command_filter = match query.where_filter.as_deref() {
38            Some(sql) => Some(parse_backfill_filter(sql)?),
39            None => None,
40        };
41        let subscription_filter = subscription
42            .where_filter
43            .as_deref()
44            .and_then(parse_subscription_filter);
45        let tenant_column = self.tenant_column(&query.collection);
46        let rls_filter = if tenant_column.is_none()
47            && crate::runtime::impl_core::rls_is_enabled(self, &query.collection)
48        {
49            match crate::runtime::impl_core::rls_policy_filter(
50                self,
51                &query.collection,
52                crate::storage::query::ast::PolicyAction::Select,
53            ) {
54                Some(filter) => Some(filter),
55                None => return Ok(backfill_result(raw_query, 0, 0, 0, &query.target_queue)),
56            }
57        } else {
58            None
59        };
60
61        let store = self.inner.db.store();
62        let manager = store
63            .get_collection(&query.collection)
64            .ok_or_else(|| RedDBError::NotFound(query.collection.clone()))?;
65        let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
66        let mut entities = manager.query_all(|entity| {
67            crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity)
68        });
69        entities.sort_by_key(|entity| entity.id.raw());
70
71        let effective_queue = crate::runtime::mutation::effective_queue_name(&subscription);
72        let mut existing_event_ids = queue_event_ids(store.as_ref(), &effective_queue)?;
73        let subscription_id = subscription_identity(&subscription);
74        let mut matched = 0u64;
75        let mut enqueued = 0u64;
76        let mut skipped = 0u64;
77
78        for entity in entities {
79            if !row_matches_filter(self, &query.collection, &entity, command_filter.as_ref()) {
80                continue;
81            }
82            if !row_matches_filter(
83                self,
84                &query.collection,
85                &entity,
86                subscription_filter.as_ref(),
87            ) {
88                continue;
89            }
90            if !row_matches_tenant_scope(&query.collection, &entity, tenant_column.as_deref()) {
91                continue;
92            }
93            if !row_matches_filter(self, &query.collection, &entity, rls_filter.as_ref()) {
94                continue;
95            }
96
97            if query.limit.is_some_and(|limit| matched >= limit) {
98                break;
99            }
100            matched += 1;
101
102            let after = crate::runtime::mutation::entity_row_json(&entity);
103            let (event_id, payload) = crate::runtime::mutation::backfill_event_payload(
104                &query.collection,
105                entity.id.raw(),
106                &after,
107                &subscription_id,
108                subscription.redact_fields.as_slice(),
109            )?;
110            if existing_event_ids.contains(&event_id) {
111                skipped += 1;
112                continue;
113            }
114
115            self.enqueue_event_payload(&effective_queue, Value::Json(payload))?;
116            existing_event_ids.insert(event_id);
117            enqueued += 1;
118        }
119
120        self.invalidate_result_cache_for_table(&effective_queue);
121        Ok(backfill_result(
122            raw_query,
123            matched,
124            enqueued,
125            skipped,
126            &effective_queue,
127        ))
128    }
129}
130
131fn parse_backfill_filter(sql: &str) -> RedDBResult<Filter> {
132    crate::storage::query::Parser::new(sql)
133        .and_then(|mut parser| parser.parse_filter())
134        .map_err(|err| RedDBError::Query(format!("invalid EVENTS BACKFILL WHERE predicate: {err}")))
135}
136
137fn parse_subscription_filter(sql: &str) -> Option<Filter> {
138    crate::storage::query::Parser::new(sql)
139        .ok()
140        .and_then(|mut parser| parser.parse_filter().ok())
141}
142
143fn row_matches_filter(
144    runtime: &RedDBRuntime,
145    collection: &str,
146    entity: &UnifiedEntity,
147    filter: Option<&Filter>,
148) -> bool {
149    filter.is_none_or(|filter| {
150        crate::runtime::query_exec::evaluate_entity_filter_with_db(
151            Some(&runtime.inner.db),
152            entity,
153            filter,
154            collection,
155            collection,
156        )
157    })
158}
159
160fn row_matches_tenant_scope(
161    collection: &str,
162    entity: &UnifiedEntity,
163    tenant_column: Option<&str>,
164) -> bool {
165    let Some(tenant_column) = tenant_column else {
166        return true;
167    };
168    let Some(tenant) = crate::runtime::impl_core::current_tenant() else {
169        return true;
170    };
171    let row = crate::runtime::mutation::entity_row_json(entity);
172    json_path_string(&row, tenant_column).is_some_and(|value| value == tenant)
173        || json_path_string(&row, &format!("{collection}.{tenant_column}"))
174            .is_some_and(|value| value == tenant)
175}
176
177fn json_path_string<'a>(value: &'a crate::json::Value, path: &str) -> Option<&'a str> {
178    let mut current = value;
179    for part in path.split('.') {
180        current = current.get(part)?;
181    }
182    current.as_str()
183}
184
185fn subscription_identity(subscription: &crate::catalog::SubscriptionDescriptor) -> String {
186    if subscription.name.is_empty() {
187        format!("{}->{}", subscription.source, subscription.target_queue)
188    } else {
189        subscription.name.clone()
190    }
191}
192
193fn queue_event_ids(store: &UnifiedStore, queue: &str) -> RedDBResult<HashSet<String>> {
194    let Some(manager) = store.get_collection(queue) else {
195        return Ok(HashSet::new());
196    };
197    let mut ids = HashSet::new();
198    for entity in manager.query_all(|entity| matches!(entity.kind, EntityKind::QueueMessage { .. }))
199    {
200        let EntityData::QueueMessage(message) = entity.data else {
201            continue;
202        };
203        let Value::Json(bytes) = message.payload else {
204            continue;
205        };
206        let Ok(json) = crate::json::from_slice::<crate::json::Value>(&bytes) else {
207            continue;
208        };
209        if let Some(event_id) = json.get("event_id").and_then(|value| value.as_str()) {
210            ids.insert(event_id.to_string());
211        }
212    }
213    Ok(ids)
214}
215
216fn backfill_result(
217    raw_query: &str,
218    matched: u64,
219    enqueued: u64,
220    skipped: u64,
221    queue: &str,
222) -> RuntimeQueryResult {
223    let mut result = UnifiedResult::with_columns(vec![
224        "matched".into(),
225        "enqueued".into(),
226        "skipped".into(),
227        "queue".into(),
228    ]);
229    let mut record = UnifiedRecord::new();
230    record.set("matched", Value::UnsignedInteger(matched));
231    record.set("enqueued", Value::UnsignedInteger(enqueued));
232    record.set("skipped", Value::UnsignedInteger(skipped));
233    record.set("queue", Value::text(queue.to_string()));
234    result.push(record);
235
236    RuntimeQueryResult {
237        query: raw_query.to_string(),
238        mode: QueryMode::Sql,
239        statement: "events_backfill",
240        engine: "runtime-events",
241        result,
242        affected_rows: enqueued,
243        statement_type: "insert",
244    }
245}