use alloc::collections::BTreeMap;
use alloc::string::String;
use alloc::vec::Vec;
use core::sync::atomic::{AtomicU64, Ordering};
use zerodds_amqp_bridge::extended_types::AmqpExtValue;
use crate::mapping::BodyEncodingMode;
use crate::metrics::MetricsHub;
pub const DEFAULT_RPC_TIMEOUT_MS: u64 = 30_000;
pub const DEFAULT_MAX_OUTSTANDING_CALLS: usize = 4096;
#[derive(Debug, Clone)]
pub struct RpcConfig {
pub rpc_aware: bool,
pub rpc_timeout_ms: u64,
pub max_outstanding: usize,
pub reply_body_mode: BodyEncodingMode,
}
impl Default for RpcConfig {
fn default() -> Self {
Self {
rpc_aware: false,
rpc_timeout_ms: DEFAULT_RPC_TIMEOUT_MS,
max_outstanding: DEFAULT_MAX_OUTSTANDING_CALLS,
reply_body_mode: BodyEncodingMode::PassThrough,
}
}
}
pub type CorrelationId = String;
#[derive(Debug, Clone)]
pub struct OutstandingCall {
pub request_id: CorrelationId,
pub issued_at_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ReplyDecision {
Surface {
correlation: CorrelationId,
},
RejectMalformed {
error: &'static str,
},
DropUnknown,
DecodeFailure {
error: &'static str,
},
DropLateReply,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum IssueDecision {
Accepted,
OutOfResources,
}
#[derive(Debug)]
pub struct OutstandingCalls {
cfg: RpcConfig,
table: BTreeMap<CorrelationId, OutstandingCall>,
next_call_id: AtomicU64,
}
impl OutstandingCalls {
#[must_use]
pub fn new(cfg: RpcConfig) -> Self {
Self {
cfg,
table: BTreeMap::new(),
next_call_id: AtomicU64::new(1),
}
}
#[must_use]
pub fn outstanding(&self) -> usize {
self.table.len()
}
pub fn issue(&mut self, request_id: CorrelationId, issued_at_ms: u64) -> IssueDecision {
if self.table.len() >= self.cfg.max_outstanding {
return IssueDecision::OutOfResources;
}
self.table.insert(
request_id.clone(),
OutstandingCall {
request_id,
issued_at_ms,
},
);
let _ = self.next_call_id.fetch_add(1, Ordering::Relaxed);
IssueDecision::Accepted
}
pub fn validate_reply(
&mut self,
properties: &ReplyProperties,
now_ms: u64,
body_mode: BodyEncodingMode,
body_decoded_ok: bool,
metrics: &MetricsHub,
) -> ReplyDecision {
let Some(correlation) = properties.correlation_id.as_ref() else {
metrics.on_dropped_malformed_reply();
return ReplyDecision::RejectMalformed {
error: "amqp:precondition-failed",
};
};
let Some(call) = self.table.remove(correlation) else {
metrics.on_dropped_malformed_reply();
return ReplyDecision::DropUnknown;
};
if now_ms.saturating_sub(call.issued_at_ms) > self.cfg.rpc_timeout_ms {
metrics.on_dropped_malformed_reply();
return ReplyDecision::DropLateReply;
}
if !body_decoded_ok {
metrics.on_decode_error();
return ReplyDecision::DecodeFailure {
error: "amqp:decode-error",
};
}
let _ = body_mode;
ReplyDecision::Surface {
correlation: call.request_id,
}
}
pub fn expire_overdue(&mut self, now_ms: u64, metrics: &MetricsHub) -> Vec<CorrelationId> {
let cutoff_window = self.cfg.rpc_timeout_ms;
let expired: Vec<CorrelationId> = self
.table
.iter()
.filter_map(|(id, call)| {
if now_ms.saturating_sub(call.issued_at_ms) > cutoff_window {
Some(id.clone())
} else {
None
}
})
.collect();
for id in &expired {
self.table.remove(id);
metrics.on_rpc_timeout();
}
expired
}
#[must_use]
pub fn capacity(&self) -> usize {
self.cfg.max_outstanding
}
#[must_use]
pub fn config(&self) -> &RpcConfig {
&self.cfg
}
}
#[derive(Debug, Clone, Default)]
pub struct ReplyProperties {
pub correlation_id: Option<CorrelationId>,
pub reply_to: Option<String>,
}
impl ReplyProperties {
#[must_use]
pub fn from_amqp(correlation_id: Option<&AmqpExtValue>) -> Self {
let id = correlation_id.and_then(|v| match v {
AmqpExtValue::Str(s) => Some(s.clone()),
AmqpExtValue::Symbol(s) => Some(s.clone()),
AmqpExtValue::Uuid(bytes) => Some(format_uuid(*bytes)),
AmqpExtValue::Binary(b) => Some(hex_lower(b)),
_ => None,
});
Self {
correlation_id: id,
reply_to: None,
}
}
}
fn format_uuid(b: [u8; 16]) -> String {
let mut s = String::with_capacity(36);
for (i, byte) in b.iter().enumerate() {
if matches!(i, 4 | 6 | 8 | 10) {
s.push('-');
}
let _ = core::fmt::Write::write_fmt(&mut s, core::format_args!("{byte:02x}"));
}
s
}
fn hex_lower(bytes: &[u8]) -> String {
let mut out = String::with_capacity(bytes.len() * 2);
for b in bytes {
let _ = core::fmt::Write::write_fmt(&mut out, core::format_args!("{b:02x}"));
}
out
}
#[cfg(test)]
#[allow(clippy::expect_used, clippy::unwrap_used, clippy::panic)]
mod tests {
use super::*;
fn cfg(rpc_aware: bool) -> RpcConfig {
RpcConfig {
rpc_aware,
..RpcConfig::default()
}
}
fn issue(table: &mut OutstandingCalls, id: &str, t: u64) {
assert_eq!(table.issue(id.to_string(), t), IssueDecision::Accepted);
}
#[test]
fn defaults_match_spec() {
assert_eq!(DEFAULT_RPC_TIMEOUT_MS, 30_000);
assert_eq!(DEFAULT_MAX_OUTSTANDING_CALLS, 4096);
let c = RpcConfig::default();
assert!(!c.rpc_aware);
}
#[test]
fn issue_accepts_then_full() {
let mut t = OutstandingCalls::new(RpcConfig {
max_outstanding: 2,
..cfg(true)
});
issue(&mut t, "a", 0);
issue(&mut t, "b", 0);
assert_eq!(t.issue("c".to_string(), 0), IssueDecision::OutOfResources);
assert_eq!(t.outstanding(), 2);
assert_eq!(t.capacity(), 2);
}
#[test]
fn validate_reply_rejects_missing_correlation_id() {
let metrics = MetricsHub::new();
let mut t = OutstandingCalls::new(cfg(true));
let props = ReplyProperties::default();
let d = t.validate_reply(&props, 100, BodyEncodingMode::PassThrough, true, &metrics);
assert!(matches!(d, ReplyDecision::RejectMalformed { .. }));
assert_eq!(
metrics.snapshot(crate::metrics::names::TRANSFERS_DROPPED_MALFORMED_REPLY),
Some(1)
);
}
#[test]
fn validate_reply_drops_unknown_correlation_id() {
let metrics = MetricsHub::new();
let mut t = OutstandingCalls::new(cfg(true));
let props = ReplyProperties {
correlation_id: Some("ghost".into()),
..Default::default()
};
let d = t.validate_reply(&props, 100, BodyEncodingMode::PassThrough, true, &metrics);
assert_eq!(d, ReplyDecision::DropUnknown);
assert_eq!(
metrics.snapshot(crate::metrics::names::TRANSFERS_DROPPED_MALFORMED_REPLY),
Some(1)
);
}
#[test]
fn validate_reply_surfaces_matched_call() {
let metrics = MetricsHub::new();
let mut t = OutstandingCalls::new(cfg(true));
issue(&mut t, "req-1", 100);
let props = ReplyProperties {
correlation_id: Some("req-1".into()),
..Default::default()
};
let d = t.validate_reply(&props, 200, BodyEncodingMode::PassThrough, true, &metrics);
assert_eq!(
d,
ReplyDecision::Surface {
correlation: "req-1".into()
}
);
assert_eq!(t.outstanding(), 0);
assert_eq!(
metrics.snapshot(crate::metrics::names::TRANSFERS_DROPPED_MALFORMED_REPLY),
Some(0)
);
}
#[test]
fn validate_reply_decode_failure_reports() {
let metrics = MetricsHub::new();
let mut t = OutstandingCalls::new(cfg(true));
issue(&mut t, "req-1", 100);
let props = ReplyProperties {
correlation_id: Some("req-1".into()),
..Default::default()
};
let d = t.validate_reply(
&props,
200,
BodyEncodingMode::PassThrough,
false,
&metrics,
);
assert!(matches!(d, ReplyDecision::DecodeFailure { .. }));
assert_eq!(
metrics.snapshot(crate::metrics::names::ERRORS_DECODE),
Some(1)
);
}
#[test]
fn validate_reply_late_reply_dropped() {
let metrics = MetricsHub::new();
let mut t = OutstandingCalls::new(RpcConfig {
rpc_timeout_ms: 100,
..cfg(true)
});
issue(&mut t, "req-1", 0);
let props = ReplyProperties {
correlation_id: Some("req-1".into()),
..Default::default()
};
let d = t.validate_reply(&props, 200, BodyEncodingMode::PassThrough, true, &metrics);
assert_eq!(d, ReplyDecision::DropLateReply);
assert_eq!(
metrics.snapshot(crate::metrics::names::TRANSFERS_DROPPED_MALFORMED_REPLY),
Some(1)
);
}
#[test]
fn expire_overdue_removes_and_counts() {
let metrics = MetricsHub::new();
let mut t = OutstandingCalls::new(RpcConfig {
rpc_timeout_ms: 100,
..cfg(true)
});
issue(&mut t, "a", 0);
issue(&mut t, "b", 50);
issue(&mut t, "c", 200);
let expired = t.expire_overdue(200, &metrics);
assert_eq!(expired.len(), 2);
assert!(expired.contains(&"a".to_string()));
assert!(expired.contains(&"b".to_string()));
assert_eq!(t.outstanding(), 1);
assert_eq!(
metrics.snapshot(crate::metrics::names::RPC_CALLS_TIMED_OUT),
Some(2)
);
}
#[test]
fn expire_overdue_at_exact_deadline_does_not_remove() {
let metrics = MetricsHub::new();
let mut t = OutstandingCalls::new(RpcConfig {
rpc_timeout_ms: 100,
..cfg(true)
});
issue(&mut t, "edge", 0);
let expired = t.expire_overdue(100, &metrics);
assert!(expired.is_empty());
assert_eq!(t.outstanding(), 1);
}
#[test]
fn from_amqp_handles_str_symbol_uuid_binary() {
let p = ReplyProperties::from_amqp(Some(&AmqpExtValue::Str("foo".into())));
assert_eq!(p.correlation_id, Some("foo".to_string()));
let p = ReplyProperties::from_amqp(Some(&AmqpExtValue::Symbol("sym".into())));
assert_eq!(p.correlation_id, Some("sym".to_string()));
let p = ReplyProperties::from_amqp(Some(&AmqpExtValue::Uuid([
0x55, 0xee, 0x00, 0x00, 0xaa, 0xbb, 0xcc, 0xdd, 0x12, 0x34, 0x56, 0x78, 0x9a, 0xbc,
0xde, 0xf0,
])));
assert_eq!(
p.correlation_id,
Some("55ee0000-aabb-ccdd-1234-56789abcdef0".to_string())
);
let p = ReplyProperties::from_amqp(Some(&AmqpExtValue::Binary(alloc::vec![0xab, 0xcd])));
assert_eq!(p.correlation_id, Some("abcd".to_string()));
}
#[test]
fn validate_reply_does_not_block_on_table_full() {
let mut t = OutstandingCalls::new(RpcConfig {
max_outstanding: 1,
..cfg(true)
});
issue(&mut t, "a", 0);
let r = t.issue("b".to_string(), 0);
assert_eq!(r, IssueDecision::OutOfResources);
}
#[test]
fn issue_capacity_zero_always_rejects() {
let mut t = OutstandingCalls::new(RpcConfig {
max_outstanding: 0,
..cfg(true)
});
assert_eq!(t.issue("anything".into(), 0), IssueDecision::OutOfResources);
}
#[test]
fn rpc_timeout_can_be_overridden() {
let c = RpcConfig {
rpc_timeout_ms: 5_000,
..RpcConfig::default()
};
let t = OutstandingCalls::new(c);
assert_eq!(t.config().rpc_timeout_ms, 5_000);
}
}