1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//! RFC-019 Stage B — `ff_signal_event` outbox producer helper.
//!
//! Every successful `deliver_signal` call INSERTs a row here inside
//! the same SERIALIZABLE transaction as the signal state-write. The
//! `ff_signal_event_notify_trg` trigger fires
//! `pg_notify('ff_signal_event', event_id::text)` at COMMIT; the
//! `subscribe_signal_delivery` loop wakes, runs its catch-up query,
//! and forwards matching rows to the subscriber as `StreamEvent`s.
//!
//! See `migrations/0007_signal_event_outbox.sql` for the schema.
use sqlx::Postgres;
use uuid::Uuid;
use crate::error::map_sqlx_error;
use ff_core::engine_error::EngineError;
/// Append one signal-delivery event to the outbox within an in-flight
/// transaction. The trigger fires NOTIFY at COMMIT.
pub(crate) async fn emit<'c>(
tx: &mut sqlx::Transaction<'c, Postgres>,
partition_key: i16,
execution_uuid: Uuid,
signal_id: &str,
waitpoint_id: Option<&str>,
source_identity: Option<&str>,
delivered_at_ms: i64,
) -> Result<(), EngineError> {
// #282 — populate `namespace` + `instance_tag` via co-transactional
// SELECT on `ff_exec_core`. See `lease_event::emit` for rationale.
sqlx::query(
"INSERT INTO ff_signal_event \
(execution_id, signal_id, waitpoint_id, source_identity, \
delivered_at_ms, partition_key, namespace, instance_tag) \
SELECT $1, $2, $3, $4, $5, $6, \
raw_fields->>'namespace', \
raw_fields->'tags'->>'cairn.instance_id' \
FROM ff_exec_core \
WHERE partition_key = $6 AND execution_id = $7::uuid \
UNION ALL \
SELECT $1, $2, $3, $4, $5, $6, NULL, NULL \
WHERE NOT EXISTS ( \
SELECT 1 FROM ff_exec_core \
WHERE partition_key = $6 AND execution_id = $7::uuid \
)",
)
.bind(execution_uuid.to_string())
.bind(signal_id)
.bind(waitpoint_id)
.bind(source_identity)
.bind(delivered_at_ms)
.bind(i32::from(partition_key))
.bind(execution_uuid)
.execute(&mut **tx)
.await
.map_err(map_sqlx_error)?;
Ok(())
}