use crate::cx::Cx;
use crate::messaging::redis::{RedisProtocolLimits, RespValue};
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProtocolAdapterError {
Cancelled,
Lifecycle {
adapter: &'static str,
detail: String,
},
Encode {
adapter: &'static str,
detail: String,
},
Decode {
adapter: &'static str,
detail: String,
},
}
impl std::fmt::Display for ProtocolAdapterError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
Self::Cancelled => write!(f, "protocol adapter operation cancelled"),
Self::Lifecycle { adapter, detail } => {
write!(f, "{adapter} lifecycle error: {detail}")
}
Self::Encode { adapter, detail } => {
write!(f, "{adapter} encode error: {detail}")
}
Self::Decode { adapter, detail } => {
write!(f, "{adapter} decode error: {detail}")
}
}
}
}
impl std::error::Error for ProtocolAdapterError {}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProtocolConnectionState {
Idle,
Ready,
Draining,
Closed,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum ProtocolTransportEvent {
Connected,
DrainRequested,
Closed,
Reset,
}
#[derive(Debug, Clone, PartialEq, Eq, Default)]
pub struct ProtocolCapabilities {
pub pipelined_requests: bool,
pub request_reply: bool,
pub streaming_publish: bool,
pub features: Vec<&'static str>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProtocolNegotiation {
pub adapter_name: &'static str,
pub protocol_family: &'static str,
pub version_hint: Option<&'static str>,
pub capabilities: ProtocolCapabilities,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ProtocolHealth {
pub state: ProtocolConnectionState,
pub ready: bool,
pub detail: &'static str,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DecodedProtocolMessage<M> {
pub message: M,
pub consumed: usize,
}
pub trait ProtocolAdapter: Send + Sync + 'static {
type Message: Clone + Send + Sync + 'static;
fn adapter_name(&self) -> &'static str;
fn protocol_family(&self) -> &'static str;
fn connection_state(&self) -> ProtocolConnectionState;
fn begin_handshake(&self, cx: &Cx) -> Result<ProtocolNegotiation, ProtocolAdapterError>;
fn capabilities(&self) -> ProtocolCapabilities;
fn encode_message(
&self,
message: &Self::Message,
out: &mut Vec<u8>,
) -> Result<(), ProtocolAdapterError>;
fn try_decode_message(
&self,
input: &[u8],
) -> Result<Option<DecodedProtocolMessage<Self::Message>>, ProtocolAdapterError>;
fn on_transport_event(
&mut self,
cx: &Cx,
event: ProtocolTransportEvent,
) -> Result<ProtocolConnectionState, ProtocolAdapterError>;
fn health_check(&self, cx: &Cx) -> Result<ProtocolHealth, ProtocolAdapterError>;
}
#[derive(Debug, Clone)]
pub struct RespProtocolAdapter {
limits: RedisProtocolLimits,
state: ProtocolConnectionState,
}
impl RespProtocolAdapter {
#[must_use]
pub fn new(limits: RedisProtocolLimits) -> Self {
Self {
limits,
state: ProtocolConnectionState::Idle,
}
}
#[must_use]
pub const fn limits(&self) -> RedisProtocolLimits {
self.limits
}
}
impl Default for RespProtocolAdapter {
fn default() -> Self {
Self::new(RedisProtocolLimits::default())
}
}
impl ProtocolAdapter for RespProtocolAdapter {
type Message = RespValue;
fn adapter_name(&self) -> &'static str {
"redis-resp-adapter"
}
fn protocol_family(&self) -> &'static str {
"redis-resp"
}
fn connection_state(&self) -> ProtocolConnectionState {
self.state
}
fn begin_handshake(&self, cx: &Cx) -> Result<ProtocolNegotiation, ProtocolAdapterError> {
cx.checkpoint()
.map_err(|_| ProtocolAdapterError::Cancelled)?;
if self.state == ProtocolConnectionState::Closed {
return Err(ProtocolAdapterError::Lifecycle {
adapter: self.adapter_name(),
detail: "cannot negotiate after transport close".to_string(),
});
}
Ok(ProtocolNegotiation {
adapter_name: self.adapter_name(),
protocol_family: self.protocol_family(),
version_hint: Some("RESP2"),
capabilities: self.capabilities(),
})
}
fn capabilities(&self) -> ProtocolCapabilities {
ProtocolCapabilities {
pipelined_requests: true,
request_reply: true,
streaming_publish: false,
features: vec![
"bulk_strings",
"arrays",
"integers",
"simple_strings",
"error_frames",
],
}
}
fn encode_message(
&self,
message: &Self::Message,
out: &mut Vec<u8>,
) -> Result<(), ProtocolAdapterError> {
if self.state == ProtocolConnectionState::Closed {
return Err(ProtocolAdapterError::Lifecycle {
adapter: self.adapter_name(),
detail: "cannot encode after transport close".to_string(),
});
}
message.encode_into(out);
Ok(())
}
fn try_decode_message(
&self,
input: &[u8],
) -> Result<Option<DecodedProtocolMessage<Self::Message>>, ProtocolAdapterError> {
if self.state == ProtocolConnectionState::Closed {
return Err(ProtocolAdapterError::Lifecycle {
adapter: self.adapter_name(),
detail: "cannot decode after transport close".to_string(),
});
}
RespValue::try_decode_with_limits(input, &self.limits)
.map(|decoded| {
decoded.map(|(message, consumed)| DecodedProtocolMessage { message, consumed })
})
.map_err(|err| ProtocolAdapterError::Decode {
adapter: self.adapter_name(),
detail: err.to_string(),
})
}
fn on_transport_event(
&mut self,
cx: &Cx,
event: ProtocolTransportEvent,
) -> Result<ProtocolConnectionState, ProtocolAdapterError> {
cx.checkpoint()
.map_err(|_| ProtocolAdapterError::Cancelled)?;
let next = match (self.state, event) {
(ProtocolConnectionState::Idle, ProtocolTransportEvent::Connected) => {
ProtocolConnectionState::Ready
}
(
ProtocolConnectionState::Idle
| ProtocolConnectionState::Ready
| ProtocolConnectionState::Draining,
ProtocolTransportEvent::Closed | ProtocolTransportEvent::Reset,
) => ProtocolConnectionState::Closed,
(ProtocolConnectionState::Ready, ProtocolTransportEvent::DrainRequested) => {
ProtocolConnectionState::Draining
}
(ProtocolConnectionState::Closed, _) => {
return Err(ProtocolAdapterError::Lifecycle {
adapter: self.adapter_name(),
detail: "adapter is already closed".to_string(),
});
}
_ => {
return Err(ProtocolAdapterError::Lifecycle {
adapter: self.adapter_name(),
detail: format!("event {event:?} is invalid from state {:?}", self.state),
});
}
};
self.state = next;
Ok(self.state)
}
fn health_check(&self, cx: &Cx) -> Result<ProtocolHealth, ProtocolAdapterError> {
cx.checkpoint()
.map_err(|_| ProtocolAdapterError::Cancelled)?;
let detail = match self.state {
ProtocolConnectionState::Idle => "waiting for transport connect",
ProtocolConnectionState::Ready => "adapter ready",
ProtocolConnectionState::Draining => "draining in-flight work",
ProtocolConnectionState::Closed => "transport closed",
};
Ok(ProtocolHealth {
state: self.state,
ready: self.state == ProtocolConnectionState::Ready,
detail,
})
}
}
#[cfg(test)]
mod tests {
use super::{
ProtocolAdapter, ProtocolAdapterError, ProtocolConnectionState, ProtocolTransportEvent,
RespProtocolAdapter,
};
use crate::cx::Cx;
use crate::messaging::redis::RespValue;
use crate::types::{Budget, RegionId, TaskId};
fn test_cx(slot: u32) -> Cx {
Cx::new(
RegionId::new_for_test(slot, 0),
TaskId::new_for_test(slot, 0),
Budget::INFINITE,
)
}
#[test]
fn resp_adapter_reports_handshake_capabilities() {
let cx = test_cx(1);
let adapter = RespProtocolAdapter::default();
let negotiation = adapter.begin_handshake(&cx).expect("handshake succeeds");
assert_eq!(negotiation.adapter_name, "redis-resp-adapter");
assert_eq!(negotiation.protocol_family, "redis-resp");
assert_eq!(negotiation.version_hint, Some("RESP2"));
assert!(negotiation.capabilities.pipelined_requests);
assert!(negotiation.capabilities.request_reply);
assert!(negotiation.capabilities.features.contains(&"bulk_strings"));
}
#[test]
fn resp_adapter_round_trips_resp_frames() {
let adapter = RespProtocolAdapter::default();
let frame = RespValue::Array(Some(vec![
RespValue::BulkString(Some(b"PING".to_vec())),
RespValue::BulkString(Some(b"payload".to_vec())),
]));
let mut encoded = Vec::new();
adapter
.encode_message(&frame, &mut encoded)
.expect("encode succeeds");
let decoded = adapter
.try_decode_message(&encoded)
.expect("decode succeeds")
.expect("full frame available");
assert_eq!(decoded.message, frame);
assert_eq!(decoded.consumed, encoded.len());
}
#[test]
fn resp_adapter_tracks_lifecycle_and_health() {
let cx = test_cx(2);
let mut adapter = RespProtocolAdapter::default();
assert_eq!(adapter.connection_state(), ProtocolConnectionState::Idle);
assert_eq!(
adapter.on_transport_event(&cx, ProtocolTransportEvent::Connected),
Ok(ProtocolConnectionState::Ready)
);
assert!(adapter.health_check(&cx).expect("health").ready);
assert_eq!(
adapter.on_transport_event(&cx, ProtocolTransportEvent::DrainRequested),
Ok(ProtocolConnectionState::Draining)
);
assert_eq!(
adapter.on_transport_event(&cx, ProtocolTransportEvent::Closed),
Ok(ProtocolConnectionState::Closed)
);
assert!(!adapter.health_check(&cx).expect("health").ready);
}
#[test]
fn resp_adapter_rejects_reopen_after_close() {
let cx = test_cx(3);
let mut adapter = RespProtocolAdapter::default();
adapter
.on_transport_event(&cx, ProtocolTransportEvent::Closed)
.expect("initial close succeeds");
let err = adapter
.on_transport_event(&cx, ProtocolTransportEvent::Connected)
.expect_err("closed adapter should reject reconnect");
assert!(matches!(err, ProtocolAdapterError::Lifecycle { .. }));
}
#[test]
fn resp_adapter_observes_cancellation() {
let cx = test_cx(4);
cx.set_cancel_requested(true);
let adapter = RespProtocolAdapter::default();
let err = adapter
.begin_handshake(&cx)
.expect_err("cancelled cx should fail handshake");
assert_eq!(err, ProtocolAdapterError::Cancelled);
}
}