reddb_server/runtime/
impl_events.rs1use std::collections::HashSet;
4
5use super::*;
6
7impl RedDBRuntime {
8 pub fn execute_events_backfill(
9 &self,
10 raw_query: &str,
11 query: &EventsBackfillQuery,
12 ) -> RedDBResult<RuntimeQueryResult> {
13 self.check_write(crate::runtime::write_gate::WriteKind::Dml)?;
14
15 let contract = self
16 .db()
17 .collection_contract_arc(&query.collection)
18 .ok_or_else(|| RedDBError::NotFound(query.collection.clone()))?;
19 if contract.declared_model == crate::catalog::CollectionModel::Queue {
20 return Err(RedDBError::Query(
21 "queues cannot be event backfill sources".to_string(),
22 ));
23 }
24
25 let subscription = contract
26 .subscriptions
27 .iter()
28 .find(|sub| sub.enabled && sub.target_queue == query.target_queue)
29 .cloned()
30 .ok_or_else(|| {
31 RedDBError::Query(format!(
32 "no enabled event subscription from '{}' to '{}'",
33 query.collection, query.target_queue
34 ))
35 })?;
36
37 let command_filter = match query.where_filter.as_deref() {
38 Some(sql) => Some(parse_backfill_filter(sql)?),
39 None => None,
40 };
41 let subscription_filter = subscription
42 .where_filter
43 .as_deref()
44 .and_then(parse_subscription_filter);
45 let tenant_column = self.tenant_column(&query.collection);
46 let rls_filter = if tenant_column.is_none()
47 && crate::runtime::impl_core::rls_is_enabled(self, &query.collection)
48 {
49 match crate::runtime::impl_core::rls_policy_filter(
50 self,
51 &query.collection,
52 crate::storage::query::ast::PolicyAction::Select,
53 ) {
54 Some(filter) => Some(filter),
55 None => return Ok(backfill_result(raw_query, 0, 0, 0, &query.target_queue)),
56 }
57 } else {
58 None
59 };
60
61 let store = self.inner.db.store();
62 let manager = store
63 .get_collection(&query.collection)
64 .ok_or_else(|| RedDBError::NotFound(query.collection.clone()))?;
65 let snap_ctx = crate::runtime::impl_core::capture_current_snapshot();
66 let mut entities = manager.query_all(|entity| {
67 crate::runtime::impl_core::entity_visible_with_context(snap_ctx.as_ref(), entity)
68 });
69 entities.sort_by_key(|entity| entity.id.raw());
70
71 let effective_queue = crate::runtime::mutation::effective_queue_name(&subscription);
72 let mut existing_event_ids = queue_event_ids(store.as_ref(), &effective_queue)?;
73 let subscription_id = subscription_identity(&subscription);
74 let mut matched = 0u64;
75 let mut enqueued = 0u64;
76 let mut skipped = 0u64;
77
78 for entity in entities {
79 if !row_matches_filter(self, &query.collection, &entity, command_filter.as_ref()) {
80 continue;
81 }
82 if !row_matches_filter(
83 self,
84 &query.collection,
85 &entity,
86 subscription_filter.as_ref(),
87 ) {
88 continue;
89 }
90 if !row_matches_tenant_scope(&query.collection, &entity, tenant_column.as_deref()) {
91 continue;
92 }
93 if !row_matches_filter(self, &query.collection, &entity, rls_filter.as_ref()) {
94 continue;
95 }
96
97 if query.limit.is_some_and(|limit| matched >= limit) {
98 break;
99 }
100 matched += 1;
101
102 let after = crate::runtime::mutation::entity_row_json(&entity);
103 let (event_id, payload) = crate::runtime::mutation::backfill_event_payload(
104 &query.collection,
105 entity.id.raw(),
106 &after,
107 &subscription_id,
108 subscription.redact_fields.as_slice(),
109 )?;
110 if existing_event_ids.contains(&event_id) {
111 skipped += 1;
112 continue;
113 }
114
115 self.enqueue_event_payload(&effective_queue, Value::Json(payload))?;
116 existing_event_ids.insert(event_id);
117 enqueued += 1;
118 }
119
120 self.invalidate_result_cache_for_table(&effective_queue);
121 Ok(backfill_result(
122 raw_query,
123 matched,
124 enqueued,
125 skipped,
126 &effective_queue,
127 ))
128 }
129}
130
131fn parse_backfill_filter(sql: &str) -> RedDBResult<Filter> {
132 crate::storage::query::Parser::new(sql)
133 .and_then(|mut parser| parser.parse_filter())
134 .map_err(|err| RedDBError::Query(format!("invalid EVENTS BACKFILL WHERE predicate: {err}")))
135}
136
137fn parse_subscription_filter(sql: &str) -> Option<Filter> {
138 crate::storage::query::Parser::new(sql)
139 .ok()
140 .and_then(|mut parser| parser.parse_filter().ok())
141}
142
143fn row_matches_filter(
144 runtime: &RedDBRuntime,
145 collection: &str,
146 entity: &UnifiedEntity,
147 filter: Option<&Filter>,
148) -> bool {
149 filter.is_none_or(|filter| {
150 crate::runtime::query_exec::evaluate_entity_filter_with_db(
151 Some(&runtime.inner.db),
152 entity,
153 filter,
154 collection,
155 collection,
156 )
157 })
158}
159
160fn row_matches_tenant_scope(
161 collection: &str,
162 entity: &UnifiedEntity,
163 tenant_column: Option<&str>,
164) -> bool {
165 let Some(tenant_column) = tenant_column else {
166 return true;
167 };
168 let Some(tenant) = crate::runtime::impl_core::current_tenant() else {
169 return true;
170 };
171 let row = crate::runtime::mutation::entity_row_json(entity);
172 json_path_string(&row, tenant_column).is_some_and(|value| value == tenant)
173 || json_path_string(&row, &format!("{collection}.{tenant_column}"))
174 .is_some_and(|value| value == tenant)
175}
176
177fn json_path_string<'a>(value: &'a crate::json::Value, path: &str) -> Option<&'a str> {
178 let mut current = value;
179 for part in path.split('.') {
180 current = current.get(part)?;
181 }
182 current.as_str()
183}
184
185fn subscription_identity(subscription: &crate::catalog::SubscriptionDescriptor) -> String {
186 if subscription.name.is_empty() {
187 format!("{}->{}", subscription.source, subscription.target_queue)
188 } else {
189 subscription.name.clone()
190 }
191}
192
193fn queue_event_ids(store: &UnifiedStore, queue: &str) -> RedDBResult<HashSet<String>> {
194 let Some(manager) = store.get_collection(queue) else {
195 return Ok(HashSet::new());
196 };
197 let mut ids = HashSet::new();
198 for entity in manager.query_all(|entity| matches!(entity.kind, EntityKind::QueueMessage { .. }))
199 {
200 let EntityData::QueueMessage(message) = entity.data else {
201 continue;
202 };
203 let Value::Json(bytes) = message.payload else {
204 continue;
205 };
206 let Ok(json) = crate::json::from_slice::<crate::json::Value>(&bytes) else {
207 continue;
208 };
209 if let Some(event_id) = json.get("event_id").and_then(|value| value.as_str()) {
210 ids.insert(event_id.to_string());
211 }
212 }
213 Ok(ids)
214}
215
216fn backfill_result(
217 raw_query: &str,
218 matched: u64,
219 enqueued: u64,
220 skipped: u64,
221 queue: &str,
222) -> RuntimeQueryResult {
223 let mut result = UnifiedResult::with_columns(vec![
224 "matched".into(),
225 "enqueued".into(),
226 "skipped".into(),
227 "queue".into(),
228 ]);
229 let mut record = UnifiedRecord::new();
230 record.set("matched", Value::UnsignedInteger(matched));
231 record.set("enqueued", Value::UnsignedInteger(enqueued));
232 record.set("skipped", Value::UnsignedInteger(skipped));
233 record.set("queue", Value::text(queue.to_string()));
234 result.push(record);
235
236 RuntimeQueryResult {
237 query: raw_query.to_string(),
238 mode: QueryMode::Sql,
239 statement: "events_backfill",
240 engine: "runtime-events",
241 result,
242 affected_rows: enqueued,
243 statement_type: "insert",
244 }
245}