use rumqttc::v5::mqttbytes::QoS;
pub fn qos_for_event(event_type: &str) -> QoS {
match event_type {
"message.delta" | "thinking.delta" | "tool.updated" => QoS::AtMostOnce,
"session.reset"
| "agent.completed"
| "decomposition.started"
| "decomposition.completed" => QoS::ExactlyOnce,
_ => QoS::AtLeastOnce,
}
}
#[derive(Debug, Clone, Copy, Default)]
pub enum QosOverride {
#[default]
Policy,
Force(QoS),
}
impl QosOverride {
pub fn resolve(self, event_type: &str) -> QoS {
match self {
Self::Policy => qos_for_event(event_type),
Self::Force(qos) => qos,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn streaming_events_use_qos0() {
assert_eq!(qos_for_event("message.delta"), QoS::AtMostOnce);
assert_eq!(qos_for_event("thinking.delta"), QoS::AtMostOnce);
assert_eq!(qos_for_event("tool.updated"), QoS::AtMostOnce);
}
#[test]
fn lifecycle_events_use_qos2() {
assert_eq!(qos_for_event("session.reset"), QoS::ExactlyOnce);
assert_eq!(qos_for_event("agent.completed"), QoS::ExactlyOnce);
assert_eq!(qos_for_event("decomposition.started"), QoS::ExactlyOnce);
assert_eq!(qos_for_event("decomposition.completed"), QoS::ExactlyOnce);
}
#[test]
fn state_transition_events_use_qos1() {
assert_eq!(qos_for_event("turn.started"), QoS::AtLeastOnce);
assert_eq!(qos_for_event("turn.ended"), QoS::AtLeastOnce);
assert_eq!(qos_for_event("tool.started"), QoS::AtLeastOnce);
assert_eq!(qos_for_event("tool.ended"), QoS::AtLeastOnce);
assert_eq!(qos_for_event("phase.changed"), QoS::AtLeastOnce);
}
#[test]
fn override_forces_qos() {
let o = QosOverride::Force(QoS::AtMostOnce);
assert_eq!(o.resolve("session.reset"), QoS::AtMostOnce);
}
#[test]
fn policy_delegates_to_default() {
let o = QosOverride::Policy;
assert_eq!(o.resolve("message.delta"), QoS::AtMostOnce);
assert_eq!(o.resolve("turn.started"), QoS::AtLeastOnce);
}
}