use crate::convert::{
ProtoWorkflowId, ProtoWorkflowStatus, WireEnvelope, decode_core_value, encode_core_value,
};
use crate::error::WireError;
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
pub struct SubscriptionRequest {
#[prost(oneof = "subscription_request::Subscription", tags = "1, 2, 3")]
pub subscription: Option<subscription_request::Subscription>,
}
pub mod subscription_request {
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Oneof)]
pub enum Subscription {
#[prost(message, tag = "1")]
PerWorkflow(super::PerWorkflowSubscription),
#[prost(message, tag = "2")]
Filtered(super::FilteredSubscription),
#[prost(message, tag = "3")]
Firehose(super::FirehoseSubscription),
}
}
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
pub struct PerWorkflowSubscription {
#[prost(string, tag = "1")]
pub namespace: String,
#[prost(message, optional, tag = "2")]
pub workflow_id: Option<ProtoWorkflowId>,
#[prost(uint64, optional, tag = "3")]
pub resume_from_seq: Option<u64>,
}
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
pub struct FilteredSubscription {
#[prost(string, tag = "1")]
pub namespace: String,
#[prost(string, optional, tag = "2")]
pub workflow_type: Option<String>,
#[prost(enumeration = "ProtoWorkflowStatus", optional, tag = "3")]
pub status: Option<i32>,
#[prost(string, optional, tag = "4")]
pub namespace_selector: Option<String>,
}
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
pub struct FirehoseSubscription {
#[prost(string, tag = "1")]
pub namespace: String,
}
#[derive(Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize, prost::Message)]
pub struct StreamedEvent {
#[prost(string, tag = "1")]
pub namespace: String,
#[prost(message, optional, tag = "2")]
pub event: Option<WireEnvelope>,
}
impl StreamedEvent {
pub fn encode(
namespace: impl Into<String>,
request_id: Option<String>,
event: &aion_core::Event,
) -> Result<Self, WireError> {
let namespace = namespace.into();
let event = encode_core_value(namespace.clone(), request_id, event)?;
Ok(Self {
namespace,
event: Some(event),
})
}
pub fn decode_event(&self) -> Result<aion_core::Event, WireError> {
let event = self
.event
.as_ref()
.ok_or_else(|| WireError::backend("streamed event envelope is missing"))?;
if event.namespace != self.namespace {
return Err(WireError::backend("streamed event namespace mismatch"));
}
decode_core_value(event)
}
}
pub fn encode_streamed_event(
namespace: impl Into<String>,
request_id: Option<String>,
event: &aion_core::Event,
) -> Result<StreamedEvent, WireError> {
StreamedEvent::encode(namespace, request_id, event)
}
#[cfg(test)]
mod tests {
use chrono::{DateTime, Utc};
use prost::Message;
use serde_json::json;
use super::{
FilteredSubscription, FirehoseSubscription, PerWorkflowSubscription, StreamedEvent,
SubscriptionRequest, encode_streamed_event, subscription_request,
};
use crate::convert::{ProtoWorkflowId, ProtoWorkflowStatus, WireEnvelope};
use crate::error::WireError;
fn workflow_id() -> aion_core::WorkflowId {
aion_core::WorkflowId::new(uuid::Uuid::nil())
}
fn recorded_at() -> Result<DateTime<Utc>, chrono::ParseError> {
Ok(DateTime::parse_from_rfc3339("2026-01-01T00:00:00Z")?.with_timezone(&Utc))
}
fn event_envelope() -> Result<aion_core::EventEnvelope, chrono::ParseError> {
Ok(aion_core::EventEnvelope {
seq: 1,
recorded_at: recorded_at()?,
workflow_id: workflow_id(),
})
}
#[test]
fn subscription_request_round_trips_all_variants() -> Result<(), Box<dyn std::error::Error>> {
let requests = [
SubscriptionRequest {
subscription: Some(subscription_request::Subscription::PerWorkflow(
PerWorkflowSubscription {
namespace: String::from("tenant-a"),
workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
resume_from_seq: None,
},
)),
},
SubscriptionRequest {
subscription: Some(subscription_request::Subscription::PerWorkflow(
PerWorkflowSubscription {
namespace: String::from("tenant-a"),
workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
resume_from_seq: Some(42),
},
)),
},
SubscriptionRequest {
subscription: Some(subscription_request::Subscription::Filtered(
FilteredSubscription {
namespace: String::from("tenant-a"),
workflow_type: Some(String::from("checkout")),
status: Some(ProtoWorkflowStatus::Running as i32),
namespace_selector: Some(String::from("tenant-a")),
},
)),
},
SubscriptionRequest {
subscription: Some(subscription_request::Subscription::Filtered(
FilteredSubscription {
namespace: String::from("tenant-a"),
workflow_type: None,
status: None,
namespace_selector: None,
},
)),
},
SubscriptionRequest {
subscription: Some(subscription_request::Subscription::Firehose(
FirehoseSubscription {
namespace: String::from("tenant-a"),
},
)),
},
];
for request in requests {
let json = serde_json::to_vec(&request)?;
let from_json: SubscriptionRequest = serde_json::from_slice(&json)?;
assert_eq!(from_json, request);
let bytes = request.encode_to_vec();
let from_proto = SubscriptionRequest::decode(bytes.as_slice())?;
assert_eq!(from_proto, request);
}
Ok(())
}
#[test]
fn per_workflow_resume_cursor_round_trips_prost() -> Result<(), Box<dyn std::error::Error>> {
let with_cursor = PerWorkflowSubscription {
namespace: String::from("tenant-a"),
workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
resume_from_seq: Some(7),
};
let decoded = PerWorkflowSubscription::decode(with_cursor.encode_to_vec().as_slice())?;
assert_eq!(decoded, with_cursor);
assert_eq!(decoded.resume_from_seq, Some(7));
let without_cursor = PerWorkflowSubscription {
namespace: String::from("tenant-a"),
workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
resume_from_seq: None,
};
let decoded = PerWorkflowSubscription::decode(without_cursor.encode_to_vec().as_slice())?;
assert_eq!(decoded, without_cursor);
assert_eq!(decoded.resume_from_seq, None);
Ok(())
}
#[test]
fn per_workflow_resume_cursor_json_shape_is_pinned() -> Result<(), Box<dyn std::error::Error>> {
let with_cursor = PerWorkflowSubscription {
namespace: String::from("tenant-a"),
workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
resume_from_seq: Some(7),
};
let value = serde_json::to_value(&with_cursor)?;
assert_eq!(
value,
json!({
"namespace": "tenant-a",
"workflow_id": { "uuid": "00000000-0000-0000-0000-000000000000" },
"resume_from_seq": 7,
})
);
let from_json: PerWorkflowSubscription = serde_json::from_value(value)?;
assert_eq!(from_json, with_cursor);
let without_cursor = PerWorkflowSubscription {
namespace: String::from("tenant-a"),
workflow_id: Some(ProtoWorkflowId::from(workflow_id())),
resume_from_seq: None,
};
let value = serde_json::to_value(&without_cursor)?;
assert_eq!(
value,
json!({
"namespace": "tenant-a",
"workflow_id": { "uuid": "00000000-0000-0000-0000-000000000000" },
"resume_from_seq": null,
})
);
let from_json: PerWorkflowSubscription = serde_json::from_value(value)?;
assert_eq!(from_json, without_cursor);
Ok(())
}
#[test]
fn subscription_request_without_resume_field_decodes_to_none()
-> Result<(), Box<dyn std::error::Error>> {
let request: SubscriptionRequest = serde_json::from_value(json!({
"subscription": {
"PerWorkflow": {
"namespace": "tenant-a",
"workflow_id": { "uuid": "00000000-0000-0000-0000-000000000000" },
}
}
}))?;
let Some(subscription_request::Subscription::PerWorkflow(per_workflow)) =
request.subscription
else {
return Err(Box::from("expected a per-workflow subscription"));
};
assert_eq!(per_workflow.namespace, "tenant-a");
assert_eq!(
per_workflow.workflow_id,
Some(ProtoWorkflowId::from(workflow_id()))
);
assert_eq!(per_workflow.resume_from_seq, None);
Ok(())
}
#[test]
fn streamed_event_round_trips_core_event() -> Result<(), Box<dyn std::error::Error>> {
let event = aion_core::Event::WorkflowStarted {
envelope: event_envelope()?,
workflow_type: String::from("checkout"),
input: aion_core::Payload::from_json(&json!({ "cart": ["sku-1"] }))?,
run_id: aion_core::RunId::new(uuid::Uuid::from_u128(1)),
parent_run_id: None,
package_version: aion_core::PackageVersion::new("a".repeat(64)),
};
let frame = encode_streamed_event("tenant-a", Some(String::from("request-1")), &event)?;
assert_eq!(frame.namespace, "tenant-a");
let envelope = frame
.event
.as_ref()
.ok_or_else(|| WireError::backend("test streamed event envelope is missing"))?;
assert_eq!(envelope.namespace, "tenant-a");
assert_eq!(envelope.request_id.as_deref(), Some("request-1"));
let decoded = frame.decode_event()?;
assert_eq!(decoded, event);
Ok(())
}
#[test]
fn streamed_event_rejects_namespace_mismatch() {
let frame = StreamedEvent {
namespace: String::from("tenant-a"),
event: Some(WireEnvelope {
namespace: String::from("tenant-b"),
request_id: None,
payload: None,
}),
};
assert_eq!(
frame.decode_event(),
Err(WireError::backend("streamed event namespace mismatch"))
);
}
#[test]
fn streamed_event_rejects_missing_envelope() {
let frame = StreamedEvent {
namespace: String::from("tenant-a"),
event: None,
};
assert_eq!(
frame.decode_event(),
Err(WireError::backend("streamed event envelope is missing"))
);
}
}