1use std::sync::Arc;
2
3use facet::Facet;
4use moire_trace_types::{BacktraceId, ModuleId, RelPc, RuntimeBase};
5use moire_types::{ConnectionId, ProcessId};
6use moire_wire::{BacktraceRecord, ModuleIdentity, ModuleManifestEntry};
7use rusqlite_facet::{ConnectionFacetExt, StatementFacetExt};
8
9use crate::db::Db;
10use crate::util::time::now_nanos;
11
12#[derive(Clone)]
13pub struct BacktraceFramePersist {
14 pub frame_index: u32,
15 pub rel_pc: RelPc,
16 pub module_path: String,
17 pub module_identity: String,
18}
19
20#[derive(Clone)]
21pub struct StoredModuleManifestEntry {
22 pub module_id: ModuleId,
23 pub module_path: String,
24 pub module_identity: String,
25 pub arch: String,
26 pub runtime_base: RuntimeBase,
27}
28
29#[derive(Facet)]
30struct ConnectionUpsertParams {
31 conn_id: ConnectionId,
32 process_id: ProcessId,
33 process_name: String,
34 pid: u32,
35 connected_at_ns: i64,
36}
37
38#[derive(Facet)]
39struct ConnectionClosedParams {
40 conn_id: ConnectionId,
41 disconnected_at_ns: i64,
42}
43
44#[derive(Facet)]
45struct ProcessIdParams {
46 process_id: ProcessId,
47}
48
49#[derive(Facet)]
50struct ConnectionModuleInsertParams {
51 process_id: ProcessId,
52 module_id: ModuleId,
53 module_index: i64,
54 module_path: String,
55 module_identity: String,
56 arch: String,
57 runtime_base: RuntimeBase,
58}
59
60#[derive(Facet)]
61struct BacktraceInsertParams {
62 process_id: ProcessId,
63 backtrace_id: BacktraceId,
64 frame_count: i64,
65 received_at_ns: i64,
66}
67
68#[derive(Facet)]
69struct BacktraceFrameInsertParams {
70 process_id: ProcessId,
71 backtrace_id: BacktraceId,
72 frame_index: u32,
73 module_path: String,
74 module_identity: String,
75 rel_pc: RelPc,
76}
77
78#[derive(Facet)]
79struct CutRequestParams {
80 cut_id: String,
81 requested_at_ns: i64,
82}
83
84#[derive(Facet)]
85struct CutAckParams {
86 cut_id: String,
87 process_id: ProcessId,
88 next_seq_no: u64,
89 received_at_ns: i64,
90}
91
92#[derive(Facet)]
93struct DeltaBatchInsertParams {
94 process_id: ProcessId,
95 from_seq_no: u64,
96 next_seq_no: u64,
97 truncated: i64,
98 compacted_before_seq_no: Option<u64>,
99 change_count: u64,
100 payload_json: String,
101 received_at_ns: i64,
102}
103
104#[derive(Facet)]
105struct UpsertEntityParams {
106 process_id: ProcessId,
107 entity_id: String,
108 entity_json: String,
109 updated_at_ns: i64,
110}
111
112#[derive(Facet)]
113struct UpsertScopeParams {
114 process_id: ProcessId,
115 scope_id: String,
116 scope_json: String,
117 updated_at_ns: i64,
118}
119
120#[derive(Facet)]
121struct UpsertEntityScopeLinkParams {
122 process_id: ProcessId,
123 entity_id: String,
124 scope_id: String,
125 updated_at_ns: i64,
126}
127
128#[derive(Facet)]
129struct RemoveEntityParams {
130 process_id: ProcessId,
131 entity_id: String,
132}
133
134#[derive(Facet)]
135struct RemoveScopeParams {
136 process_id: ProcessId,
137 scope_id: String,
138}
139
140#[derive(Facet)]
141struct RemoveEntityScopeLinkParams {
142 process_id: ProcessId,
143 entity_id: String,
144 scope_id: String,
145}
146
147#[derive(Facet)]
148struct UpsertEdgeParams {
149 process_id: ProcessId,
150 src_id: String,
151 dst_id: String,
152 kind_json: String,
153 edge_json: String,
154 updated_at_ns: i64,
155}
156
157#[derive(Facet)]
158struct RemoveEdgeParams {
159 process_id: ProcessId,
160 src_id: String,
161 dst_id: String,
162 kind_json: String,
163}
164
165#[derive(Facet)]
166struct AppendEventParams {
167 process_id: ProcessId,
168 seq_no: u64,
169 event_id: String,
170 event_json: String,
171 at_ms: u64,
172}
173
174#[derive(Facet)]
175struct StreamCursorUpsertParams {
176 process_id: ProcessId,
177 next_seq_no: u64,
178 updated_at_ns: i64,
179}
180
181pub fn backtrace_frames_for_store(
182 module_manifest: &[StoredModuleManifestEntry],
183 record: &BacktraceRecord,
184) -> Result<Vec<BacktraceFramePersist>, String> {
185 let modules_by_id = module_manifest
186 .iter()
187 .map(|module| (module.module_id, module))
188 .collect::<std::collections::BTreeMap<_, _>>();
189 let mut frames = Vec::with_capacity(record.frames.len());
190 for (frame_index, frame) in record.frames.iter().enumerate() {
191 let module_id = frame.module_id;
192 let Some(module) = modules_by_id.get(&module_id) else {
193 return Err(format!(
194 "invariant violated: backtrace frame {frame_index} references module_id {} but handshake manifest for this connection has no matching module id ({} entries)",
195 module_id,
196 modules_by_id.len()
197 ));
198 };
199 frames.push(BacktraceFramePersist {
200 frame_index: frame_index as u32,
201 rel_pc: frame.rel_pc,
202 module_path: module.module_path.clone(),
203 module_identity: module.module_identity.clone(),
204 });
205 }
206 Ok(frames)
207}
208
209pub fn into_stored_module_manifest(
210 module_manifest: Vec<ModuleManifestEntry>,
211) -> Vec<StoredModuleManifestEntry> {
212 module_manifest
213 .into_iter()
214 .map(|module| StoredModuleManifestEntry {
215 module_id: module.module_id,
216 module_path: module.module_path,
217 module_identity: module_identity_key(&module.identity),
218 arch: module.arch,
219 runtime_base: module.runtime_base,
220 })
221 .collect()
222}
223
224fn module_identity_key(identity: &ModuleIdentity) -> String {
225 match identity {
226 ModuleIdentity::BuildId(build_id) => format!("build_id:{build_id}"),
227 ModuleIdentity::DebugId(debug_id) => format!("debug_id:{debug_id}"),
228 }
229}
230
231pub async fn persist_connection_upsert(
232 db: Arc<Db>,
233 conn_id: ConnectionId,
234 process_id: ProcessId,
235 process_name: String,
236 pid: u32,
237) -> Result<(), String> {
238 tokio::task::spawn_blocking(move || {
239 let conn = db.open()?;
240 conn.facet_execute_ref(
241 "INSERT INTO connections (conn_id, process_id, process_name, pid, connected_at_ns, disconnected_at_ns)
242 VALUES (:conn_id, :process_id, :process_name, :pid, :connected_at_ns, NULL)
243 ON CONFLICT(conn_id) DO UPDATE SET
244 process_id = excluded.process_id,
245 process_name = excluded.process_name,
246 pid = excluded.pid",
247 &ConnectionUpsertParams {
248 conn_id,
249 process_id,
250 process_name,
251 pid,
252 connected_at_ns: now_nanos(),
253 },
254 )
255 .map_err(|error| format!("upsert connection: {error}"))?;
256 Ok::<(), String>(())
257 })
258 .await
259 .map_err(|error| format!("join sqlite: {error}"))?
260}
261
262pub async fn persist_connection_closed(db: Arc<Db>, conn_id: ConnectionId) -> Result<(), String> {
263 tokio::task::spawn_blocking(move || {
264 let conn = db.open()?;
265 conn.facet_execute_ref(
266 "UPDATE connections
267 SET disconnected_at_ns = :disconnected_at_ns
268 WHERE conn_id = :conn_id",
269 &ConnectionClosedParams {
270 conn_id,
271 disconnected_at_ns: now_nanos(),
272 },
273 )
274 .map_err(|error| format!("close connection: {error}"))?;
275 Ok::<(), String>(())
276 })
277 .await
278 .map_err(|error| format!("join sqlite: {error}"))?
279}
280
281pub async fn persist_connection_module_manifest(
282 db: Arc<Db>,
283 process_id: ProcessId,
284 module_manifest: Vec<StoredModuleManifestEntry>,
285) -> Result<(), String> {
286 tokio::task::spawn_blocking(move || {
287 let mut conn = db.open()?;
288 let tx = conn
289 .transaction()
290 .map_err(|error| format!("start transaction: {error}"))?;
291 {
292 let mut delete_stmt = tx
293 .prepare("DELETE FROM connection_modules WHERE process_id = :process_id")
294 .map_err(|error| format!("prepare delete connection_modules: {error}"))?;
295 delete_stmt
296 .facet_execute_ref(&ProcessIdParams {
297 process_id: process_id.clone(),
298 })
299 .map_err(|error| format!("delete connection_modules: {error}"))?;
300 }
301
302 {
303 let mut insert_stmt = tx
304 .prepare(
305 "INSERT INTO connection_modules (
306 process_id, module_id, module_index, module_path, module_identity, arch, runtime_base
307 ) VALUES (
308 :process_id, :module_id, :module_index, :module_path, :module_identity, :arch, :runtime_base
309 )",
310 )
311 .map_err(|error| format!("prepare insert connection_modules: {error}"))?;
312 for (module_index, module) in module_manifest.iter().enumerate() {
313 insert_stmt
314 .facet_execute_ref(&ConnectionModuleInsertParams {
315 process_id: process_id.clone(),
316 module_id: module.module_id,
317 module_index: module_index as i64,
318 module_path: module.module_path.clone(),
319 module_identity: module.module_identity.clone(),
320 arch: module.arch.clone(),
321 runtime_base: module.runtime_base,
322 })
323 .map_err(|error| format!("insert connection_module[{module_index}]: {error}"))?;
324 }
325 }
326 tx.commit()
327 .map_err(|error| format!("commit connection_modules: {error}"))?;
328 Ok::<(), String>(())
329 })
330 .await
331 .map_err(|error| format!("join sqlite: {error}"))?
332}
333
334pub async fn persist_backtrace_record(
336 db: Arc<Db>,
337 process_id: ProcessId,
338 backtrace_id: BacktraceId,
339 frames: Vec<BacktraceFramePersist>,
340) -> Result<bool, String> {
341 tokio::task::spawn_blocking(move || {
342 let mut conn = db.open()?;
343 let tx = conn
344 .transaction()
345 .map_err(|error| format!("start transaction: {error}"))?;
346 let inserted = {
347 let mut insert_backtrace_stmt = tx
348 .prepare(
349 "INSERT INTO backtraces (process_id, backtrace_id, frame_count, received_at_ns)
350 VALUES (:process_id, :backtrace_id, :frame_count, :received_at_ns)
351 ON CONFLICT(backtrace_id) DO NOTHING",
352 )
353 .map_err(|error| format!("prepare insert backtrace: {error}"))?;
354 insert_backtrace_stmt
355 .facet_execute_ref(&BacktraceInsertParams {
356 process_id: process_id.clone(),
357 backtrace_id,
358 frame_count: frames.len() as i64,
359 received_at_ns: now_nanos(),
360 })
361 .map_err(|error| format!("insert backtrace: {error}"))?
362 > 0
363 };
364 if inserted {
365 {
366 let mut insert_frame_stmt = tx
367 .prepare(
368 "INSERT INTO backtrace_frames (
369 process_id, backtrace_id, frame_index, module_path, module_identity, rel_pc
370 ) VALUES (
371 :process_id, :backtrace_id, :frame_index, :module_path, :module_identity, :rel_pc
372 )",
373 )
374 .map_err(|error| format!("prepare insert backtrace frames: {error}"))?;
375 for frame in &frames {
376 insert_frame_stmt
377 .facet_execute_ref(&BacktraceFrameInsertParams {
378 process_id: process_id.clone(),
379 backtrace_id,
380 frame_index: frame.frame_index,
381 module_path: frame.module_path.clone(),
382 module_identity: frame.module_identity.clone(),
383 rel_pc: frame.rel_pc,
384 })
385 .map_err(|error| {
386 format!(
387 "insert backtrace frame {}/{}: {error}",
388 frame.frame_index,
389 backtrace_id
390 )
391 })?;
392 }
393 }
394 }
395 tx.commit()
396 .map_err(|error| format!("commit backtrace record: {error}"))?;
397 Ok::<bool, String>(inserted)
398 })
399 .await
400 .map_err(|error| format!("join sqlite: {error}"))?
401}
402
403pub async fn persist_cut_request(
404 db: Arc<Db>,
405 cut_id: String,
406 requested_at_ns: i64,
407) -> Result<(), String> {
408 tokio::task::spawn_blocking(move || {
409 let conn = db.open()?;
410 conn.facet_execute_ref(
411 "INSERT INTO cuts (cut_id, requested_at_ns) VALUES (?1, ?2)
412 ON CONFLICT(cut_id) DO UPDATE SET requested_at_ns = excluded.requested_at_ns",
413 &CutRequestParams {
414 cut_id,
415 requested_at_ns,
416 },
417 )
418 .map_err(|error| format!("upsert cut: {error}"))?;
419 Ok::<(), String>(())
420 })
421 .await
422 .map_err(|error| format!("join sqlite: {error}"))?
423}
424
425pub async fn persist_cut_ack(
426 db: Arc<Db>,
427 cut_id: String,
428 process_id: ProcessId,
429 stream_id: String,
430 next_seq_no: u64,
431) -> Result<(), String> {
432 tokio::task::spawn_blocking(move || {
433 if stream_id != process_id.as_str() {
434 return Err(format!(
435 "invariant violated: cut ack stream_id '{}' does not match process_id '{}'",
436 stream_id,
437 process_id.as_str()
438 ));
439 }
440 let conn = db.open()?;
441 conn.facet_execute_ref(
442 "INSERT INTO cut_acks (cut_id, process_id, next_seq_no, received_at_ns)
443 VALUES (?1, ?2, ?3, ?4)
444 ON CONFLICT(cut_id, process_id) DO UPDATE SET
445 next_seq_no = excluded.next_seq_no,
446 received_at_ns = excluded.received_at_ns",
447 &CutAckParams {
448 cut_id,
449 process_id,
450 next_seq_no,
451 received_at_ns: now_nanos(),
452 },
453 )
454 .map_err(|error| format!("upsert cut ack: {error}"))?;
455 Ok::<(), String>(())
456 })
457 .await
458 .map_err(|error| format!("join sqlite: {error}"))?
459}
460
461pub async fn persist_delta_batch(
462 db: Arc<Db>,
463 process_id: ProcessId,
464 batch: moire_types::PullChangesResponse,
465) -> Result<(), String> {
466 tokio::task::spawn_blocking(move || persist_delta_batch_blocking(&db, process_id, &batch))
467 .await
468 .map_err(|error| format!("join sqlite: {error}"))?
469}
470
471fn persist_delta_batch_blocking(
472 db: &Db,
473 process_id: ProcessId,
474 batch: &moire_types::PullChangesResponse,
475) -> Result<(), String> {
476 use moire_types::Change;
477
478 let mut conn = db.open()?;
479 let tx = conn
480 .transaction()
481 .map_err(|error| format!("start transaction: {error}"))?;
482 if batch.stream_id.0.as_str() != process_id.as_str() {
483 return Err(format!(
484 "invariant violated: delta batch stream_id '{}' does not match process_id '{}'",
485 batch.stream_id.0,
486 process_id.as_str()
487 ));
488 }
489 let received_at_ns = now_nanos();
490 let payload_json =
491 facet_json::to_string(batch).map_err(|error| format!("encode batch: {error}"))?;
492
493 {
494 let mut insert_delta_batch_stmt = tx
495 .prepare(
496 "INSERT INTO delta_batches (
497 process_id, from_seq_no, next_seq_no, truncated,
498 compacted_before_seq_no, change_count, payload_json, received_at_ns
499 ) VALUES (
500 :process_id, :from_seq_no, :next_seq_no, :truncated,
501 :compacted_before_seq_no, :change_count, :payload_json, :received_at_ns
502 )",
503 )
504 .map_err(|error| format!("prepare delta batch insert: {error}"))?;
505 insert_delta_batch_stmt
506 .facet_execute_ref(&DeltaBatchInsertParams {
507 process_id: process_id.clone(),
508 from_seq_no: batch.from_seq_no.0,
509 next_seq_no: batch.next_seq_no.0,
510 truncated: if batch.truncated { 1 } else { 0 },
511 compacted_before_seq_no: batch.compacted_before_seq_no.map(|seq_no| seq_no.0),
512 change_count: batch.changes.len() as u64,
513 payload_json,
514 received_at_ns,
515 })
516 .map_err(|error| format!("insert delta batch: {error}"))?;
517
518 let mut upsert_entity_stmt = tx
519 .prepare(
520 "INSERT INTO entities (process_id, entity_id, entity_json, updated_at_ns)
521 VALUES (:process_id, :entity_id, :entity_json, :updated_at_ns)
522 ON CONFLICT(entity_id) DO UPDATE SET
523 process_id = excluded.process_id,
524 entity_json = excluded.entity_json,
525 updated_at_ns = excluded.updated_at_ns",
526 )
527 .map_err(|error| format!("prepare entity upsert: {error}"))?;
528 let mut upsert_scope_stmt = tx
529 .prepare(
530 "INSERT INTO scopes (process_id, scope_id, scope_json, updated_at_ns)
531 VALUES (:process_id, :scope_id, :scope_json, :updated_at_ns)
532 ON CONFLICT(scope_id) DO UPDATE SET
533 process_id = excluded.process_id,
534 scope_json = excluded.scope_json,
535 updated_at_ns = excluded.updated_at_ns",
536 )
537 .map_err(|error| format!("prepare scope upsert: {error}"))?;
538 let mut upsert_entity_scope_link_stmt = tx
539 .prepare(
540 "INSERT INTO entity_scope_links (process_id, entity_id, scope_id, updated_at_ns)
541 VALUES (:process_id, :entity_id, :scope_id, :updated_at_ns)
542 ON CONFLICT(entity_id, scope_id) DO UPDATE SET
543 process_id = excluded.process_id,
544 updated_at_ns = excluded.updated_at_ns",
545 )
546 .map_err(|error| format!("prepare entity_scope_link upsert: {error}"))?;
547 let mut delete_entity_stmt = tx
548 .prepare(
549 "DELETE FROM entities
550 WHERE process_id = :process_id AND entity_id = :entity_id",
551 )
552 .map_err(|error| format!("prepare delete entity: {error}"))?;
553 let mut delete_entity_scope_links_for_entity_stmt = tx
554 .prepare(
555 "DELETE FROM entity_scope_links
556 WHERE process_id = :process_id AND entity_id = :entity_id",
557 )
558 .map_err(|error| format!("prepare delete entity_scope_links for entity: {error}"))?;
559 let mut delete_incident_edges_stmt = tx
560 .prepare(
561 "DELETE FROM edges
562 WHERE process_id = :process_id
563 AND (src_id = :entity_id OR dst_id = :entity_id)",
564 )
565 .map_err(|error| format!("prepare delete incident edges: {error}"))?;
566 let mut delete_scope_stmt = tx
567 .prepare(
568 "DELETE FROM scopes
569 WHERE process_id = :process_id AND scope_id = :scope_id",
570 )
571 .map_err(|error| format!("prepare delete scope: {error}"))?;
572 let mut delete_entity_scope_links_for_scope_stmt = tx
573 .prepare(
574 "DELETE FROM entity_scope_links
575 WHERE process_id = :process_id AND scope_id = :scope_id",
576 )
577 .map_err(|error| format!("prepare delete entity_scope_links for scope: {error}"))?;
578 let mut delete_entity_scope_link_stmt = tx
579 .prepare(
580 "DELETE FROM entity_scope_links
581 WHERE process_id = :process_id
582 AND entity_id = :entity_id AND scope_id = :scope_id",
583 )
584 .map_err(|error| format!("prepare delete entity_scope_link: {error}"))?;
585 let mut upsert_edge_stmt = tx
586 .prepare(
587 "INSERT INTO edges (process_id, src_id, dst_id, kind_json, edge_json, updated_at_ns)
588 VALUES (:process_id, :src_id, :dst_id, :kind_json, :edge_json, :updated_at_ns)
589 ON CONFLICT(src_id, dst_id, kind_json) DO UPDATE SET
590 process_id = excluded.process_id,
591 edge_json = excluded.edge_json,
592 updated_at_ns = excluded.updated_at_ns",
593 )
594 .map_err(|error| format!("prepare edge upsert: {error}"))?;
595 let mut delete_edge_stmt = tx
596 .prepare(
597 "DELETE FROM edges
598 WHERE process_id = :process_id
599 AND src_id = :src_id AND dst_id = :dst_id AND kind_json = :kind_json",
600 )
601 .map_err(|error| format!("prepare delete edge: {error}"))?;
602 let mut append_event_stmt = tx
603 .prepare(
604 "INSERT OR REPLACE INTO events (process_id, seq_no, event_id, event_json, at_ms)
605 VALUES (:process_id, :seq_no, :event_id, :event_json, :at_ms)",
606 )
607 .map_err(|error| format!("prepare append event: {error}"))?;
608
609 for stamped in &batch.changes {
610 match &stamped.change {
611 Change::UpsertEntity(entity) => {
612 let entity_json = facet_json::to_string(entity)
613 .map_err(|error| format!("encode entity: {error}"))?;
614 upsert_entity_stmt
615 .facet_execute_ref(&UpsertEntityParams {
616 process_id: process_id.clone(),
617 entity_id: entity.id.as_str().to_string(),
618 entity_json,
619 updated_at_ns: received_at_ns,
620 })
621 .map_err(|error| format!("upsert entity: {error}"))?;
622 }
623 Change::UpsertScope(scope) => {
624 let scope_json = facet_json::to_string(scope)
625 .map_err(|error| format!("encode scope: {error}"))?;
626 upsert_scope_stmt
627 .facet_execute_ref(&UpsertScopeParams {
628 process_id: process_id.clone(),
629 scope_id: scope.id.as_str().to_string(),
630 scope_json,
631 updated_at_ns: received_at_ns,
632 })
633 .map_err(|error| format!("upsert scope: {error}"))?;
634 }
635 Change::UpsertEntityScopeLink {
636 entity_id,
637 scope_id,
638 } => {
639 upsert_entity_scope_link_stmt
640 .facet_execute_ref(&UpsertEntityScopeLinkParams {
641 process_id: process_id.clone(),
642 entity_id: entity_id.as_str().to_string(),
643 scope_id: scope_id.as_str().to_string(),
644 updated_at_ns: received_at_ns,
645 })
646 .map_err(|error| format!("upsert entity_scope_link: {error}"))?;
647 }
648 Change::RemoveEntity { id } => {
649 let params = RemoveEntityParams {
650 process_id: process_id.clone(),
651 entity_id: id.as_str().to_string(),
652 };
653 delete_entity_stmt
654 .facet_execute_ref(¶ms)
655 .map_err(|error| format!("delete entity: {error}"))?;
656 delete_entity_scope_links_for_entity_stmt
657 .facet_execute_ref(¶ms)
658 .map_err(|error| {
659 format!("delete entity_scope_links for entity: {error}")
660 })?;
661 delete_incident_edges_stmt
662 .facet_execute_ref(¶ms)
663 .map_err(|error| format!("delete incident edges: {error}"))?;
664 }
665 Change::RemoveScope { id } => {
666 let params = RemoveScopeParams {
667 process_id: process_id.clone(),
668 scope_id: id.as_str().to_string(),
669 };
670 delete_scope_stmt
671 .facet_execute_ref(¶ms)
672 .map_err(|error| format!("delete scope: {error}"))?;
673 delete_entity_scope_links_for_scope_stmt
674 .facet_execute_ref(¶ms)
675 .map_err(|error| format!("delete entity_scope_links for scope: {error}"))?;
676 }
677 Change::RemoveEntityScopeLink {
678 entity_id,
679 scope_id,
680 } => {
681 delete_entity_scope_link_stmt
682 .facet_execute_ref(&RemoveEntityScopeLinkParams {
683 process_id: process_id.clone(),
684 entity_id: entity_id.as_str().to_string(),
685 scope_id: scope_id.as_str().to_string(),
686 })
687 .map_err(|error| format!("delete entity_scope_link: {error}"))?;
688 }
689 Change::UpsertEdge(edge) => {
690 let kind_json = facet_json::to_string(&edge.kind)
691 .map_err(|error| format!("encode edge kind: {error}"))?;
692 let edge_json = facet_json::to_string(edge)
693 .map_err(|error| format!("encode edge: {error}"))?;
694 upsert_edge_stmt
695 .facet_execute_ref(&UpsertEdgeParams {
696 process_id: process_id.clone(),
697 src_id: edge.src.as_str().to_string(),
698 dst_id: edge.dst.as_str().to_string(),
699 kind_json,
700 edge_json,
701 updated_at_ns: received_at_ns,
702 })
703 .map_err(|error| format!("upsert edge: {error}"))?;
704 }
705 Change::RemoveEdge { src, dst, kind } => {
706 let kind_json = facet_json::to_string(kind)
707 .map_err(|error| format!("encode edge kind: {error}"))?;
708 delete_edge_stmt
709 .facet_execute_ref(&RemoveEdgeParams {
710 process_id: process_id.clone(),
711 src_id: src.as_str().to_string(),
712 dst_id: dst.as_str().to_string(),
713 kind_json,
714 })
715 .map_err(|error| format!("delete edge: {error}"))?;
716 }
717 Change::AppendEvent(event) => {
718 let event_json = facet_json::to_string(event)
719 .map_err(|error| format!("encode event: {error}"))?;
720 append_event_stmt
721 .facet_execute_ref(&AppendEventParams {
722 process_id: process_id.clone(),
723 seq_no: stamped.seq_no.0,
724 event_id: event.id.as_str().to_string(),
725 event_json,
726 at_ms: event.at.as_millis(),
727 })
728 .map_err(|error| format!("append event: {error}"))?;
729 }
730 }
731 }
732
733 let mut upsert_stream_cursor_stmt = tx
734 .prepare(
735 "INSERT INTO stream_cursors (process_id, next_seq_no, updated_at_ns)
736 VALUES (:process_id, :next_seq_no, :updated_at_ns)
737 ON CONFLICT(process_id) DO UPDATE SET
738 next_seq_no = excluded.next_seq_no,
739 updated_at_ns = excluded.updated_at_ns",
740 )
741 .map_err(|error| format!("prepare stream cursor upsert: {error}"))?;
742 upsert_stream_cursor_stmt
743 .facet_execute_ref(&StreamCursorUpsertParams {
744 process_id: process_id.clone(),
745 next_seq_no: batch.next_seq_no.0,
746 updated_at_ns: received_at_ns,
747 })
748 .map_err(|error| format!("upsert stream cursor: {error}"))?;
749 }
750
751 tx.commit()
752 .map_err(|error| format!("commit transaction: {error}"))?;
753 Ok(())
754}