Skip to main content

moire_web/db/
persist.rs

1use std::sync::Arc;
2
3use facet::Facet;
4use moire_trace_types::{BacktraceId, ModuleId, RelPc, RuntimeBase};
5use moire_types::{ConnectionId, ProcessId};
6use moire_wire::{BacktraceRecord, ModuleIdentity, ModuleManifestEntry};
7use rusqlite_facet::{ConnectionFacetExt, StatementFacetExt};
8
9use crate::db::Db;
10use crate::util::time::now_nanos;
11
12#[derive(Clone)]
13pub struct BacktraceFramePersist {
14    pub frame_index: u32,
15    pub rel_pc: RelPc,
16    pub module_path: String,
17    pub module_identity: String,
18}
19
20#[derive(Clone)]
21pub struct StoredModuleManifestEntry {
22    pub module_id: ModuleId,
23    pub module_path: String,
24    pub module_identity: String,
25    pub arch: String,
26    pub runtime_base: RuntimeBase,
27}
28
29#[derive(Facet)]
30struct ConnectionUpsertParams {
31    conn_id: ConnectionId,
32    process_id: ProcessId,
33    process_name: String,
34    pid: u32,
35    connected_at_ns: i64,
36}
37
38#[derive(Facet)]
39struct ConnectionClosedParams {
40    conn_id: ConnectionId,
41    disconnected_at_ns: i64,
42}
43
44#[derive(Facet)]
45struct ProcessIdParams {
46    process_id: ProcessId,
47}
48
49#[derive(Facet)]
50struct ConnectionModuleInsertParams {
51    process_id: ProcessId,
52    module_id: ModuleId,
53    module_index: i64,
54    module_path: String,
55    module_identity: String,
56    arch: String,
57    runtime_base: RuntimeBase,
58}
59
60#[derive(Facet)]
61struct BacktraceInsertParams {
62    process_id: ProcessId,
63    backtrace_id: BacktraceId,
64    frame_count: i64,
65    received_at_ns: i64,
66}
67
68#[derive(Facet)]
69struct BacktraceFrameInsertParams {
70    process_id: ProcessId,
71    backtrace_id: BacktraceId,
72    frame_index: u32,
73    module_path: String,
74    module_identity: String,
75    rel_pc: RelPc,
76}
77
78#[derive(Facet)]
79struct CutRequestParams {
80    cut_id: String,
81    requested_at_ns: i64,
82}
83
84#[derive(Facet)]
85struct CutAckParams {
86    cut_id: String,
87    process_id: ProcessId,
88    next_seq_no: u64,
89    received_at_ns: i64,
90}
91
92#[derive(Facet)]
93struct DeltaBatchInsertParams {
94    process_id: ProcessId,
95    from_seq_no: u64,
96    next_seq_no: u64,
97    truncated: i64,
98    compacted_before_seq_no: Option<u64>,
99    change_count: u64,
100    payload_json: String,
101    received_at_ns: i64,
102}
103
104#[derive(Facet)]
105struct UpsertEntityParams {
106    process_id: ProcessId,
107    entity_id: String,
108    entity_json: String,
109    updated_at_ns: i64,
110}
111
112#[derive(Facet)]
113struct UpsertScopeParams {
114    process_id: ProcessId,
115    scope_id: String,
116    scope_json: String,
117    updated_at_ns: i64,
118}
119
120#[derive(Facet)]
121struct UpsertEntityScopeLinkParams {
122    process_id: ProcessId,
123    entity_id: String,
124    scope_id: String,
125    updated_at_ns: i64,
126}
127
128#[derive(Facet)]
129struct RemoveEntityParams {
130    process_id: ProcessId,
131    entity_id: String,
132}
133
134#[derive(Facet)]
135struct RemoveScopeParams {
136    process_id: ProcessId,
137    scope_id: String,
138}
139
140#[derive(Facet)]
141struct RemoveEntityScopeLinkParams {
142    process_id: ProcessId,
143    entity_id: String,
144    scope_id: String,
145}
146
147#[derive(Facet)]
148struct UpsertEdgeParams {
149    process_id: ProcessId,
150    src_id: String,
151    dst_id: String,
152    kind_json: String,
153    edge_json: String,
154    updated_at_ns: i64,
155}
156
157#[derive(Facet)]
158struct RemoveEdgeParams {
159    process_id: ProcessId,
160    src_id: String,
161    dst_id: String,
162    kind_json: String,
163}
164
165#[derive(Facet)]
166struct AppendEventParams {
167    process_id: ProcessId,
168    seq_no: u64,
169    event_id: String,
170    event_json: String,
171    at_ms: u64,
172}
173
174#[derive(Facet)]
175struct StreamCursorUpsertParams {
176    process_id: ProcessId,
177    next_seq_no: u64,
178    updated_at_ns: i64,
179}
180
181pub fn backtrace_frames_for_store(
182    module_manifest: &[StoredModuleManifestEntry],
183    record: &BacktraceRecord,
184) -> Result<Vec<BacktraceFramePersist>, String> {
185    let modules_by_id = module_manifest
186        .iter()
187        .map(|module| (module.module_id, module))
188        .collect::<std::collections::BTreeMap<_, _>>();
189    let mut frames = Vec::with_capacity(record.frames.len());
190    for (frame_index, frame) in record.frames.iter().enumerate() {
191        let module_id = frame.module_id;
192        let Some(module) = modules_by_id.get(&module_id) else {
193            return Err(format!(
194                "invariant violated: backtrace frame {frame_index} references module_id {} but handshake manifest for this connection has no matching module id ({} entries)",
195                module_id,
196                modules_by_id.len()
197            ));
198        };
199        frames.push(BacktraceFramePersist {
200            frame_index: frame_index as u32,
201            rel_pc: frame.rel_pc,
202            module_path: module.module_path.clone(),
203            module_identity: module.module_identity.clone(),
204        });
205    }
206    Ok(frames)
207}
208
209pub fn into_stored_module_manifest(
210    module_manifest: Vec<ModuleManifestEntry>,
211) -> Vec<StoredModuleManifestEntry> {
212    module_manifest
213        .into_iter()
214        .map(|module| StoredModuleManifestEntry {
215            module_id: module.module_id,
216            module_path: module.module_path,
217            module_identity: module_identity_key(&module.identity),
218            arch: module.arch,
219            runtime_base: module.runtime_base,
220        })
221        .collect()
222}
223
224fn module_identity_key(identity: &ModuleIdentity) -> String {
225    match identity {
226        ModuleIdentity::BuildId(build_id) => format!("build_id:{build_id}"),
227        ModuleIdentity::DebugId(debug_id) => format!("debug_id:{debug_id}"),
228    }
229}
230
231pub async fn persist_connection_upsert(
232    db: Arc<Db>,
233    conn_id: ConnectionId,
234    process_id: ProcessId,
235    process_name: String,
236    pid: u32,
237) -> Result<(), String> {
238    tokio::task::spawn_blocking(move || {
239        let conn = db.open()?;
240        conn.facet_execute_ref(
241            "INSERT INTO connections (conn_id, process_id, process_name, pid, connected_at_ns, disconnected_at_ns)
242             VALUES (:conn_id, :process_id, :process_name, :pid, :connected_at_ns, NULL)
243             ON CONFLICT(conn_id) DO UPDATE SET
244               process_id = excluded.process_id,
245               process_name = excluded.process_name,
246               pid = excluded.pid",
247            &ConnectionUpsertParams {
248                conn_id,
249                process_id,
250                process_name,
251                pid,
252                connected_at_ns: now_nanos(),
253            },
254        )
255        .map_err(|error| format!("upsert connection: {error}"))?;
256        Ok::<(), String>(())
257    })
258    .await
259    .map_err(|error| format!("join sqlite: {error}"))?
260}
261
262pub async fn persist_connection_closed(db: Arc<Db>, conn_id: ConnectionId) -> Result<(), String> {
263    tokio::task::spawn_blocking(move || {
264        let conn = db.open()?;
265        conn.facet_execute_ref(
266            "UPDATE connections
267             SET disconnected_at_ns = :disconnected_at_ns
268             WHERE conn_id = :conn_id",
269            &ConnectionClosedParams {
270                conn_id,
271                disconnected_at_ns: now_nanos(),
272            },
273        )
274        .map_err(|error| format!("close connection: {error}"))?;
275        Ok::<(), String>(())
276    })
277    .await
278    .map_err(|error| format!("join sqlite: {error}"))?
279}
280
281pub async fn persist_connection_module_manifest(
282    db: Arc<Db>,
283    process_id: ProcessId,
284    module_manifest: Vec<StoredModuleManifestEntry>,
285) -> Result<(), String> {
286    tokio::task::spawn_blocking(move || {
287        let mut conn = db.open()?;
288        let tx = conn
289            .transaction()
290            .map_err(|error| format!("start transaction: {error}"))?;
291        {
292            let mut delete_stmt = tx
293                .prepare("DELETE FROM connection_modules WHERE process_id = :process_id")
294                .map_err(|error| format!("prepare delete connection_modules: {error}"))?;
295            delete_stmt
296                .facet_execute_ref(&ProcessIdParams {
297                    process_id: process_id.clone(),
298                })
299                .map_err(|error| format!("delete connection_modules: {error}"))?;
300        }
301
302        {
303            let mut insert_stmt = tx
304                .prepare(
305                    "INSERT INTO connection_modules (
306                        process_id, module_id, module_index, module_path, module_identity, arch, runtime_base
307                     ) VALUES (
308                        :process_id, :module_id, :module_index, :module_path, :module_identity, :arch, :runtime_base
309                     )",
310                )
311                .map_err(|error| format!("prepare insert connection_modules: {error}"))?;
312            for (module_index, module) in module_manifest.iter().enumerate() {
313                insert_stmt
314                    .facet_execute_ref(&ConnectionModuleInsertParams {
315                        process_id: process_id.clone(),
316                        module_id: module.module_id,
317                        module_index: module_index as i64,
318                        module_path: module.module_path.clone(),
319                        module_identity: module.module_identity.clone(),
320                        arch: module.arch.clone(),
321                        runtime_base: module.runtime_base,
322                    })
323                    .map_err(|error| format!("insert connection_module[{module_index}]: {error}"))?;
324            }
325        }
326        tx.commit()
327            .map_err(|error| format!("commit connection_modules: {error}"))?;
328        Ok::<(), String>(())
329    })
330    .await
331    .map_err(|error| format!("join sqlite: {error}"))?
332}
333
334// r[impl symbolicate.server-store]
335pub async fn persist_backtrace_record(
336    db: Arc<Db>,
337    process_id: ProcessId,
338    backtrace_id: BacktraceId,
339    frames: Vec<BacktraceFramePersist>,
340) -> Result<bool, String> {
341    tokio::task::spawn_blocking(move || {
342        let mut conn = db.open()?;
343        let tx = conn
344            .transaction()
345            .map_err(|error| format!("start transaction: {error}"))?;
346        let inserted = {
347            let mut insert_backtrace_stmt = tx
348                .prepare(
349                    "INSERT INTO backtraces (process_id, backtrace_id, frame_count, received_at_ns)
350                     VALUES (:process_id, :backtrace_id, :frame_count, :received_at_ns)
351                     ON CONFLICT(backtrace_id) DO NOTHING",
352                )
353                .map_err(|error| format!("prepare insert backtrace: {error}"))?;
354            insert_backtrace_stmt
355                .facet_execute_ref(&BacktraceInsertParams {
356                    process_id: process_id.clone(),
357                    backtrace_id,
358                    frame_count: frames.len() as i64,
359                    received_at_ns: now_nanos(),
360                })
361                .map_err(|error| format!("insert backtrace: {error}"))?
362                > 0
363        };
364        if inserted {
365            {
366                let mut insert_frame_stmt = tx
367                    .prepare(
368                        "INSERT INTO backtrace_frames (
369                            process_id, backtrace_id, frame_index, module_path, module_identity, rel_pc
370                         ) VALUES (
371                            :process_id, :backtrace_id, :frame_index, :module_path, :module_identity, :rel_pc
372                         )",
373                    )
374                    .map_err(|error| format!("prepare insert backtrace frames: {error}"))?;
375                for frame in &frames {
376                    insert_frame_stmt
377                        .facet_execute_ref(&BacktraceFrameInsertParams {
378                            process_id: process_id.clone(),
379                            backtrace_id,
380                            frame_index: frame.frame_index,
381                            module_path: frame.module_path.clone(),
382                            module_identity: frame.module_identity.clone(),
383                            rel_pc: frame.rel_pc,
384                        })
385                        .map_err(|error| {
386                            format!(
387                                "insert backtrace frame {}/{}: {error}",
388                                frame.frame_index,
389                                backtrace_id
390                            )
391                        })?;
392                }
393            }
394        }
395        tx.commit()
396            .map_err(|error| format!("commit backtrace record: {error}"))?;
397        Ok::<bool, String>(inserted)
398    })
399    .await
400    .map_err(|error| format!("join sqlite: {error}"))?
401}
402
403pub async fn persist_cut_request(
404    db: Arc<Db>,
405    cut_id: String,
406    requested_at_ns: i64,
407) -> Result<(), String> {
408    tokio::task::spawn_blocking(move || {
409        let conn = db.open()?;
410        conn.facet_execute_ref(
411            "INSERT INTO cuts (cut_id, requested_at_ns) VALUES (?1, ?2)
412             ON CONFLICT(cut_id) DO UPDATE SET requested_at_ns = excluded.requested_at_ns",
413            &CutRequestParams {
414                cut_id,
415                requested_at_ns,
416            },
417        )
418        .map_err(|error| format!("upsert cut: {error}"))?;
419        Ok::<(), String>(())
420    })
421    .await
422    .map_err(|error| format!("join sqlite: {error}"))?
423}
424
425pub async fn persist_cut_ack(
426    db: Arc<Db>,
427    cut_id: String,
428    process_id: ProcessId,
429    stream_id: String,
430    next_seq_no: u64,
431) -> Result<(), String> {
432    tokio::task::spawn_blocking(move || {
433        if stream_id != process_id.as_str() {
434            return Err(format!(
435                "invariant violated: cut ack stream_id '{}' does not match process_id '{}'",
436                stream_id,
437                process_id.as_str()
438            ));
439        }
440        let conn = db.open()?;
441        conn.facet_execute_ref(
442            "INSERT INTO cut_acks (cut_id, process_id, next_seq_no, received_at_ns)
443             VALUES (?1, ?2, ?3, ?4)
444             ON CONFLICT(cut_id, process_id) DO UPDATE SET
445               next_seq_no = excluded.next_seq_no,
446               received_at_ns = excluded.received_at_ns",
447            &CutAckParams {
448                cut_id,
449                process_id,
450                next_seq_no,
451                received_at_ns: now_nanos(),
452            },
453        )
454        .map_err(|error| format!("upsert cut ack: {error}"))?;
455        Ok::<(), String>(())
456    })
457    .await
458    .map_err(|error| format!("join sqlite: {error}"))?
459}
460
461pub async fn persist_delta_batch(
462    db: Arc<Db>,
463    process_id: ProcessId,
464    batch: moire_types::PullChangesResponse,
465) -> Result<(), String> {
466    tokio::task::spawn_blocking(move || persist_delta_batch_blocking(&db, process_id, &batch))
467        .await
468        .map_err(|error| format!("join sqlite: {error}"))?
469}
470
471fn persist_delta_batch_blocking(
472    db: &Db,
473    process_id: ProcessId,
474    batch: &moire_types::PullChangesResponse,
475) -> Result<(), String> {
476    use moire_types::Change;
477
478    let mut conn = db.open()?;
479    let tx = conn
480        .transaction()
481        .map_err(|error| format!("start transaction: {error}"))?;
482    if batch.stream_id.0.as_str() != process_id.as_str() {
483        return Err(format!(
484            "invariant violated: delta batch stream_id '{}' does not match process_id '{}'",
485            batch.stream_id.0,
486            process_id.as_str()
487        ));
488    }
489    let received_at_ns = now_nanos();
490    let payload_json =
491        facet_json::to_string(batch).map_err(|error| format!("encode batch: {error}"))?;
492
493    {
494        let mut insert_delta_batch_stmt = tx
495            .prepare(
496                "INSERT INTO delta_batches (
497                process_id, from_seq_no, next_seq_no, truncated,
498                compacted_before_seq_no, change_count, payload_json, received_at_ns
499             ) VALUES (
500                :process_id, :from_seq_no, :next_seq_no, :truncated,
501                :compacted_before_seq_no, :change_count, :payload_json, :received_at_ns
502             )",
503            )
504            .map_err(|error| format!("prepare delta batch insert: {error}"))?;
505        insert_delta_batch_stmt
506            .facet_execute_ref(&DeltaBatchInsertParams {
507                process_id: process_id.clone(),
508                from_seq_no: batch.from_seq_no.0,
509                next_seq_no: batch.next_seq_no.0,
510                truncated: if batch.truncated { 1 } else { 0 },
511                compacted_before_seq_no: batch.compacted_before_seq_no.map(|seq_no| seq_no.0),
512                change_count: batch.changes.len() as u64,
513                payload_json,
514                received_at_ns,
515            })
516            .map_err(|error| format!("insert delta batch: {error}"))?;
517
518        let mut upsert_entity_stmt = tx
519            .prepare(
520                "INSERT INTO entities (process_id, entity_id, entity_json, updated_at_ns)
521             VALUES (:process_id, :entity_id, :entity_json, :updated_at_ns)
522             ON CONFLICT(entity_id) DO UPDATE SET
523               process_id = excluded.process_id,
524               entity_json = excluded.entity_json,
525               updated_at_ns = excluded.updated_at_ns",
526            )
527            .map_err(|error| format!("prepare entity upsert: {error}"))?;
528        let mut upsert_scope_stmt = tx
529            .prepare(
530                "INSERT INTO scopes (process_id, scope_id, scope_json, updated_at_ns)
531             VALUES (:process_id, :scope_id, :scope_json, :updated_at_ns)
532             ON CONFLICT(scope_id) DO UPDATE SET
533               process_id = excluded.process_id,
534               scope_json = excluded.scope_json,
535               updated_at_ns = excluded.updated_at_ns",
536            )
537            .map_err(|error| format!("prepare scope upsert: {error}"))?;
538        let mut upsert_entity_scope_link_stmt = tx
539            .prepare(
540                "INSERT INTO entity_scope_links (process_id, entity_id, scope_id, updated_at_ns)
541             VALUES (:process_id, :entity_id, :scope_id, :updated_at_ns)
542             ON CONFLICT(entity_id, scope_id) DO UPDATE SET
543               process_id = excluded.process_id,
544               updated_at_ns = excluded.updated_at_ns",
545            )
546            .map_err(|error| format!("prepare entity_scope_link upsert: {error}"))?;
547        let mut delete_entity_stmt = tx
548            .prepare(
549                "DELETE FROM entities
550             WHERE process_id = :process_id AND entity_id = :entity_id",
551            )
552            .map_err(|error| format!("prepare delete entity: {error}"))?;
553        let mut delete_entity_scope_links_for_entity_stmt = tx
554            .prepare(
555                "DELETE FROM entity_scope_links
556             WHERE process_id = :process_id AND entity_id = :entity_id",
557            )
558            .map_err(|error| format!("prepare delete entity_scope_links for entity: {error}"))?;
559        let mut delete_incident_edges_stmt = tx
560            .prepare(
561                "DELETE FROM edges
562             WHERE process_id = :process_id
563               AND (src_id = :entity_id OR dst_id = :entity_id)",
564            )
565            .map_err(|error| format!("prepare delete incident edges: {error}"))?;
566        let mut delete_scope_stmt = tx
567            .prepare(
568                "DELETE FROM scopes
569             WHERE process_id = :process_id AND scope_id = :scope_id",
570            )
571            .map_err(|error| format!("prepare delete scope: {error}"))?;
572        let mut delete_entity_scope_links_for_scope_stmt = tx
573            .prepare(
574                "DELETE FROM entity_scope_links
575             WHERE process_id = :process_id AND scope_id = :scope_id",
576            )
577            .map_err(|error| format!("prepare delete entity_scope_links for scope: {error}"))?;
578        let mut delete_entity_scope_link_stmt = tx
579            .prepare(
580                "DELETE FROM entity_scope_links
581             WHERE process_id = :process_id
582               AND entity_id = :entity_id AND scope_id = :scope_id",
583            )
584            .map_err(|error| format!("prepare delete entity_scope_link: {error}"))?;
585        let mut upsert_edge_stmt = tx
586        .prepare(
587            "INSERT INTO edges (process_id, src_id, dst_id, kind_json, edge_json, updated_at_ns)
588             VALUES (:process_id, :src_id, :dst_id, :kind_json, :edge_json, :updated_at_ns)
589             ON CONFLICT(src_id, dst_id, kind_json) DO UPDATE SET
590               process_id = excluded.process_id,
591               edge_json = excluded.edge_json,
592               updated_at_ns = excluded.updated_at_ns",
593        )
594        .map_err(|error| format!("prepare edge upsert: {error}"))?;
595        let mut delete_edge_stmt = tx
596            .prepare(
597                "DELETE FROM edges
598             WHERE process_id = :process_id
599               AND src_id = :src_id AND dst_id = :dst_id AND kind_json = :kind_json",
600            )
601            .map_err(|error| format!("prepare delete edge: {error}"))?;
602        let mut append_event_stmt = tx
603            .prepare(
604                "INSERT OR REPLACE INTO events (process_id, seq_no, event_id, event_json, at_ms)
605             VALUES (:process_id, :seq_no, :event_id, :event_json, :at_ms)",
606            )
607            .map_err(|error| format!("prepare append event: {error}"))?;
608
609        for stamped in &batch.changes {
610            match &stamped.change {
611                Change::UpsertEntity(entity) => {
612                    let entity_json = facet_json::to_string(entity)
613                        .map_err(|error| format!("encode entity: {error}"))?;
614                    upsert_entity_stmt
615                        .facet_execute_ref(&UpsertEntityParams {
616                            process_id: process_id.clone(),
617                            entity_id: entity.id.as_str().to_string(),
618                            entity_json,
619                            updated_at_ns: received_at_ns,
620                        })
621                        .map_err(|error| format!("upsert entity: {error}"))?;
622                }
623                Change::UpsertScope(scope) => {
624                    let scope_json = facet_json::to_string(scope)
625                        .map_err(|error| format!("encode scope: {error}"))?;
626                    upsert_scope_stmt
627                        .facet_execute_ref(&UpsertScopeParams {
628                            process_id: process_id.clone(),
629                            scope_id: scope.id.as_str().to_string(),
630                            scope_json,
631                            updated_at_ns: received_at_ns,
632                        })
633                        .map_err(|error| format!("upsert scope: {error}"))?;
634                }
635                Change::UpsertEntityScopeLink {
636                    entity_id,
637                    scope_id,
638                } => {
639                    upsert_entity_scope_link_stmt
640                        .facet_execute_ref(&UpsertEntityScopeLinkParams {
641                            process_id: process_id.clone(),
642                            entity_id: entity_id.as_str().to_string(),
643                            scope_id: scope_id.as_str().to_string(),
644                            updated_at_ns: received_at_ns,
645                        })
646                        .map_err(|error| format!("upsert entity_scope_link: {error}"))?;
647                }
648                Change::RemoveEntity { id } => {
649                    let params = RemoveEntityParams {
650                        process_id: process_id.clone(),
651                        entity_id: id.as_str().to_string(),
652                    };
653                    delete_entity_stmt
654                        .facet_execute_ref(&params)
655                        .map_err(|error| format!("delete entity: {error}"))?;
656                    delete_entity_scope_links_for_entity_stmt
657                        .facet_execute_ref(&params)
658                        .map_err(|error| {
659                            format!("delete entity_scope_links for entity: {error}")
660                        })?;
661                    delete_incident_edges_stmt
662                        .facet_execute_ref(&params)
663                        .map_err(|error| format!("delete incident edges: {error}"))?;
664                }
665                Change::RemoveScope { id } => {
666                    let params = RemoveScopeParams {
667                        process_id: process_id.clone(),
668                        scope_id: id.as_str().to_string(),
669                    };
670                    delete_scope_stmt
671                        .facet_execute_ref(&params)
672                        .map_err(|error| format!("delete scope: {error}"))?;
673                    delete_entity_scope_links_for_scope_stmt
674                        .facet_execute_ref(&params)
675                        .map_err(|error| format!("delete entity_scope_links for scope: {error}"))?;
676                }
677                Change::RemoveEntityScopeLink {
678                    entity_id,
679                    scope_id,
680                } => {
681                    delete_entity_scope_link_stmt
682                        .facet_execute_ref(&RemoveEntityScopeLinkParams {
683                            process_id: process_id.clone(),
684                            entity_id: entity_id.as_str().to_string(),
685                            scope_id: scope_id.as_str().to_string(),
686                        })
687                        .map_err(|error| format!("delete entity_scope_link: {error}"))?;
688                }
689                Change::UpsertEdge(edge) => {
690                    let kind_json = facet_json::to_string(&edge.kind)
691                        .map_err(|error| format!("encode edge kind: {error}"))?;
692                    let edge_json = facet_json::to_string(edge)
693                        .map_err(|error| format!("encode edge: {error}"))?;
694                    upsert_edge_stmt
695                        .facet_execute_ref(&UpsertEdgeParams {
696                            process_id: process_id.clone(),
697                            src_id: edge.src.as_str().to_string(),
698                            dst_id: edge.dst.as_str().to_string(),
699                            kind_json,
700                            edge_json,
701                            updated_at_ns: received_at_ns,
702                        })
703                        .map_err(|error| format!("upsert edge: {error}"))?;
704                }
705                Change::RemoveEdge { src, dst, kind } => {
706                    let kind_json = facet_json::to_string(kind)
707                        .map_err(|error| format!("encode edge kind: {error}"))?;
708                    delete_edge_stmt
709                        .facet_execute_ref(&RemoveEdgeParams {
710                            process_id: process_id.clone(),
711                            src_id: src.as_str().to_string(),
712                            dst_id: dst.as_str().to_string(),
713                            kind_json,
714                        })
715                        .map_err(|error| format!("delete edge: {error}"))?;
716                }
717                Change::AppendEvent(event) => {
718                    let event_json = facet_json::to_string(event)
719                        .map_err(|error| format!("encode event: {error}"))?;
720                    append_event_stmt
721                        .facet_execute_ref(&AppendEventParams {
722                            process_id: process_id.clone(),
723                            seq_no: stamped.seq_no.0,
724                            event_id: event.id.as_str().to_string(),
725                            event_json,
726                            at_ms: event.at.as_millis(),
727                        })
728                        .map_err(|error| format!("append event: {error}"))?;
729                }
730            }
731        }
732
733        let mut upsert_stream_cursor_stmt = tx
734            .prepare(
735                "INSERT INTO stream_cursors (process_id, next_seq_no, updated_at_ns)
736             VALUES (:process_id, :next_seq_no, :updated_at_ns)
737             ON CONFLICT(process_id) DO UPDATE SET
738               next_seq_no = excluded.next_seq_no,
739               updated_at_ns = excluded.updated_at_ns",
740            )
741            .map_err(|error| format!("prepare stream cursor upsert: {error}"))?;
742        upsert_stream_cursor_stmt
743            .facet_execute_ref(&StreamCursorUpsertParams {
744                process_id: process_id.clone(),
745                next_seq_no: batch.next_seq_no.0,
746                updated_at_ns: received_at_ns,
747            })
748            .map_err(|error| format!("upsert stream cursor: {error}"))?;
749    }
750
751    tx.commit()
752        .map_err(|error| format!("commit transaction: {error}"))?;
753    Ok(())
754}