arcp_core/messages/subscriptions.rs
1//! Subscription messages.
2//!
3//! Two related-but-distinct surfaces live here:
4//!
5//! - **v1.0-era envelope subscriptions** ([`SubscribePayload`],
6//! [`SubscribeAcceptedPayload`], [`SubscribeEventPayload`],
7//! [`UnsubscribePayload`], [`SubscribeClosedPayload`]): a generic
8//! filter/backfill bus over the whole envelope stream (RFC §13). These
9//! predate v1.1 and remain for backwards compatibility; new code that
10//! only wants live job events should prefer the v1.1 form below.
11//! - **ARCP v1.1 §7.6 job subscriptions** ([`JobSubscribePayload`],
12//! [`JobSubscribedPayload`], [`JobUnsubscribePayload`]): a cross-session
13//! attach to a specific job's event stream, optionally with history
14//! replay from a chosen sequence number.
15
16use serde::{Deserialize, Serialize};
17
18use crate::envelope::Priority;
19use crate::error::ErrorCode;
20use crate::ids::{JobId, MessageId, SessionId, StreamId, SubscriptionId, TraceId};
21
22/// Filter clauses for a `subscribe` request (RFC §13.2).
23///
24/// Within a clause, list elements are OR'ed; across clauses, all conditions
25/// are AND'ed.
26#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
27pub struct SubscriptionFilter {
28 /// Match envelopes whose `session_id` is in this list.
29 #[serde(default, skip_serializing_if = "Vec::is_empty")]
30 pub session_id: Vec<SessionId>,
31 /// Match envelopes whose `trace_id` is in this list.
32 #[serde(default, skip_serializing_if = "Vec::is_empty")]
33 pub trace_id: Vec<TraceId>,
34 /// Match envelopes whose `job_id` is in this list.
35 #[serde(default, skip_serializing_if = "Vec::is_empty")]
36 pub job_id: Vec<JobId>,
37 /// Match envelopes whose `stream_id` is in this list.
38 #[serde(default, skip_serializing_if = "Vec::is_empty")]
39 pub stream_id: Vec<StreamId>,
40 /// Match envelopes whose `type` is in this list.
41 #[serde(default, skip_serializing_if = "Vec::is_empty")]
42 pub types: Vec<String>,
43 /// Minimum priority.
44 #[serde(default, skip_serializing_if = "Option::is_none")]
45 pub min_priority: Option<Priority>,
46}
47
48/// `since` clause for backfill (RFC §13.3).
49#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
50pub struct SubscriptionSince {
51 /// Replay strictly after this message id.
52 #[serde(default, skip_serializing_if = "Option::is_none")]
53 pub after_message_id: Option<MessageId>,
54}
55
56/// Payload for `subscribe`.
57#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
58pub struct SubscribePayload {
59 /// Filter.
60 #[serde(default)]
61 pub filter: SubscriptionFilter,
62 /// Optional backfill clause.
63 #[serde(default, skip_serializing_if = "Option::is_none")]
64 pub since: Option<SubscriptionSince>,
65}
66
67/// Payload for `subscribe.accepted`.
68#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69pub struct SubscribeAcceptedPayload {
70 /// Newly minted subscription id.
71 pub subscription_id: SubscriptionId,
72}
73
74/// Payload for `subscribe.event` — wraps another envelope.
75#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
76pub struct SubscribeEventPayload {
77 /// The wrapped event envelope (as JSON to avoid recursive enum issues).
78 pub event: serde_json::Value,
79}
80
81/// Payload for `unsubscribe`.
82#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
83pub struct UnsubscribePayload {
84 /// Subscription to terminate.
85 pub subscription_id: SubscriptionId,
86}
87
88/// Payload for `subscribe.closed`.
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90pub struct SubscribeClosedPayload {
91 /// Subscription that was closed.
92 pub subscription_id: SubscriptionId,
93 /// Reason code.
94 pub code: ErrorCode,
95 /// Free-form reason.
96 pub reason: String,
97}
98
99// ----------------------------------------------------------------------
100// ARCP v1.1 §7.6 — cross-session job subscriptions.
101// ----------------------------------------------------------------------
102
103/// Payload for `job.subscribe` (ARCP v1.1 §7.6).
104///
105/// Lets one session attach to the live event stream of a job that was
106/// submitted in (possibly) another session. When `history` is `true`,
107/// buffered events with `seq > from_event_seq` are replayed before the
108/// runtime resumes live streaming.
109#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
110pub struct JobSubscribePayload {
111 /// Job to attach to.
112 pub job_id: JobId,
113 /// Replay floor (exclusive). If `history` is `true`, buffered events
114 /// with `seq > from_event_seq` are replayed before live streaming.
115 /// When `None`, the subscriber receives only live events.
116 #[serde(default, skip_serializing_if = "Option::is_none")]
117 pub from_event_seq: Option<u64>,
118 /// Whether to replay buffered history. Defaults to `false`.
119 #[serde(default, skip_serializing_if = "is_false")]
120 pub history: bool,
121}
122
123/// Payload for `job.subscribed` (ARCP v1.1 §7.6).
124#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
125pub struct JobSubscribedPayload {
126 /// Job that was attached to.
127 pub job_id: JobId,
128 /// Wire-level status string at subscription time (e.g.
129 /// `"running"`, `"completed"`).
130 pub current_status: String,
131 /// Resolved `name@version` (or bare `name`) the job is running.
132 pub agent: String,
133 /// Optional parent job id for delegated / child jobs.
134 #[serde(default, skip_serializing_if = "Option::is_none")]
135 pub parent_job_id: Option<JobId>,
136 /// Optional trace identifier inherited from the job.
137 #[serde(default, skip_serializing_if = "Option::is_none")]
138 pub trace_id: Option<String>,
139 /// Event sequence the subscription was attached at (the highest
140 /// `event_seq` the runtime had emitted for this job at acknowledgement
141 /// time).
142 pub subscribed_from: u64,
143 /// `true` if buffered history was replayed before the live tail.
144 #[serde(default, skip_serializing_if = "is_false")]
145 pub replayed: bool,
146}
147
148/// Payload for `job.unsubscribe` (ARCP v1.1 §7.6).
149///
150/// Cancels a previously acknowledged job subscription. The subscription
151/// does NOT grant cancel authority over the job itself — only the
152/// originating session may cancel a job.
153#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
154pub struct JobUnsubscribePayload {
155 /// Job whose subscription should be cancelled.
156 pub job_id: JobId,
157}
158
159#[allow(clippy::trivially_copy_pass_by_ref)]
160const fn is_false(b: &bool) -> bool {
161 !*b
162}