Skip to main content

arcp_core/messages/
subscriptions.rs

1//! Subscription messages.
2//!
3//! Two related-but-distinct surfaces live here:
4//!
5//! - **v1.0-era envelope subscriptions** ([`SubscribePayload`],
6//!   [`SubscribeAcceptedPayload`], [`SubscribeEventPayload`],
7//!   [`UnsubscribePayload`], [`SubscribeClosedPayload`]): a generic
8//!   filter/backfill bus over the whole envelope stream (RFC §13). These
9//!   predate v1.1 and remain for backwards compatibility; new code that
10//!   only wants live job events should prefer the v1.1 form below.
11//! - **ARCP v1.1 §7.6 job subscriptions** ([`JobSubscribePayload`],
12//!   [`JobSubscribedPayload`], [`JobUnsubscribePayload`]): a cross-session
13//!   attach to a specific job's event stream, optionally with history
14//!   replay from a chosen sequence number.
15
16use serde::{Deserialize, Serialize};
17
18use crate::envelope::Priority;
19use crate::error::ErrorCode;
20use crate::ids::{JobId, MessageId, SessionId, StreamId, SubscriptionId, TraceId};
21
22/// Filter clauses for a `subscribe` request (RFC §13.2).
23///
24/// Within a clause, list elements are OR'ed; across clauses, all conditions
25/// are AND'ed.
26#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
27pub struct SubscriptionFilter {
28    /// Match envelopes whose `session_id` is in this list.
29    #[serde(default, skip_serializing_if = "Vec::is_empty")]
30    pub session_id: Vec<SessionId>,
31    /// Match envelopes whose `trace_id` is in this list.
32    #[serde(default, skip_serializing_if = "Vec::is_empty")]
33    pub trace_id: Vec<TraceId>,
34    /// Match envelopes whose `job_id` is in this list.
35    #[serde(default, skip_serializing_if = "Vec::is_empty")]
36    pub job_id: Vec<JobId>,
37    /// Match envelopes whose `stream_id` is in this list.
38    #[serde(default, skip_serializing_if = "Vec::is_empty")]
39    pub stream_id: Vec<StreamId>,
40    /// Match envelopes whose `type` is in this list.
41    #[serde(default, skip_serializing_if = "Vec::is_empty")]
42    pub types: Vec<String>,
43    /// Minimum priority.
44    #[serde(default, skip_serializing_if = "Option::is_none")]
45    pub min_priority: Option<Priority>,
46}
47
48/// `since` clause for backfill (RFC §13.3).
49#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
50pub struct SubscriptionSince {
51    /// Replay strictly after this message id.
52    #[serde(default, skip_serializing_if = "Option::is_none")]
53    pub after_message_id: Option<MessageId>,
54}
55
56/// Payload for `subscribe`.
57#[derive(Debug, Clone, Default, PartialEq, Eq, Serialize, Deserialize)]
58pub struct SubscribePayload {
59    /// Filter.
60    #[serde(default)]
61    pub filter: SubscriptionFilter,
62    /// Optional backfill clause.
63    #[serde(default, skip_serializing_if = "Option::is_none")]
64    pub since: Option<SubscriptionSince>,
65}
66
67/// Payload for `subscribe.accepted`.
68#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
69pub struct SubscribeAcceptedPayload {
70    /// Newly minted subscription id.
71    pub subscription_id: SubscriptionId,
72}
73
74/// Payload for `subscribe.event` — wraps another envelope.
75#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
76pub struct SubscribeEventPayload {
77    /// The wrapped event envelope (as JSON to avoid recursive enum issues).
78    pub event: serde_json::Value,
79}
80
81/// Payload for `unsubscribe`.
82#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
83pub struct UnsubscribePayload {
84    /// Subscription to terminate.
85    pub subscription_id: SubscriptionId,
86}
87
88/// Payload for `subscribe.closed`.
89#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
90pub struct SubscribeClosedPayload {
91    /// Subscription that was closed.
92    pub subscription_id: SubscriptionId,
93    /// Reason code.
94    pub code: ErrorCode,
95    /// Free-form reason.
96    pub reason: String,
97}
98
99// ----------------------------------------------------------------------
100// ARCP v1.1 §7.6 — cross-session job subscriptions.
101// ----------------------------------------------------------------------
102
103/// Payload for `job.subscribe` (ARCP v1.1 §7.6).
104///
105/// Lets one session attach to the live event stream of a job that was
106/// submitted in (possibly) another session. When `history` is `true`,
107/// buffered events with `seq > from_event_seq` are replayed before the
108/// runtime resumes live streaming.
109#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
110pub struct JobSubscribePayload {
111    /// Job to attach to.
112    pub job_id: JobId,
113    /// Replay floor (exclusive). If `history` is `true`, buffered events
114    /// with `seq > from_event_seq` are replayed before live streaming.
115    /// When `None`, the subscriber receives only live events.
116    #[serde(default, skip_serializing_if = "Option::is_none")]
117    pub from_event_seq: Option<u64>,
118    /// Whether to replay buffered history. Defaults to `false`.
119    #[serde(default, skip_serializing_if = "is_false")]
120    pub history: bool,
121}
122
123/// Payload for `job.subscribed` (ARCP v1.1 §7.6).
124#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
125pub struct JobSubscribedPayload {
126    /// Job that was attached to.
127    pub job_id: JobId,
128    /// Wire-level status string at subscription time (e.g.
129    /// `"running"`, `"completed"`).
130    pub current_status: String,
131    /// Resolved `name@version` (or bare `name`) the job is running.
132    pub agent: String,
133    /// Optional parent job id for delegated / child jobs.
134    #[serde(default, skip_serializing_if = "Option::is_none")]
135    pub parent_job_id: Option<JobId>,
136    /// Optional trace identifier inherited from the job.
137    #[serde(default, skip_serializing_if = "Option::is_none")]
138    pub trace_id: Option<String>,
139    /// Event sequence the subscription was attached at (the highest
140    /// `event_seq` the runtime had emitted for this job at acknowledgement
141    /// time).
142    pub subscribed_from: u64,
143    /// `true` if buffered history was replayed before the live tail.
144    #[serde(default, skip_serializing_if = "is_false")]
145    pub replayed: bool,
146}
147
148/// Payload for `job.unsubscribe` (ARCP v1.1 §7.6).
149///
150/// Cancels a previously acknowledged job subscription. The subscription
151/// does NOT grant cancel authority over the job itself — only the
152/// originating session may cancel a job.
153#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
154pub struct JobUnsubscribePayload {
155    /// Job whose subscription should be cancelled.
156    pub job_id: JobId,
157}
158
159#[allow(clippy::trivially_copy_pass_by_ref)]
160const fn is_false(b: &bool) -> bool {
161    !*b
162}