use std::ops::Deref;
use std::sync::Arc;
use std::time::Instant;
#[derive(Debug, Clone)]
pub enum Payload {
Heap(Arc<[u8]>),
}
impl Payload {
pub fn from_vec(v: Vec<u8>) -> Self {
Self::Heap(Arc::from(v.into_boxed_slice()))
}
pub fn empty() -> Self {
Self::Heap(Arc::from([].as_slice()))
}
pub fn from_arc(a: Arc<[u8]>) -> Self {
Self::Heap(a)
}
pub fn as_bytes(&self) -> &[u8] {
match self {
Self::Heap(a) => a,
}
}
pub fn is_empty(&self) -> bool {
self.as_bytes().is_empty()
}
pub fn len(&self) -> usize {
self.as_bytes().len()
}
pub fn to_vec(&self) -> Vec<u8> {
self.as_bytes().to_vec()
}
}
impl Deref for Payload {
type Target = [u8];
fn deref(&self) -> &[u8] {
self.as_bytes()
}
}
impl AsRef<[u8]> for Payload {
fn as_ref(&self) -> &[u8] {
self.as_bytes()
}
}
impl From<Vec<u8>> for Payload {
fn from(v: Vec<u8>) -> Self {
Self::from_vec(v)
}
}
impl From<Arc<[u8]>> for Payload {
fn from(a: Arc<[u8]>) -> Self {
Self::Heap(a)
}
}
use crate::types::{Lsn, ReadConsistency, RequestId, TenantId, VShardId};
#[derive(Debug, Clone)]
pub struct Request {
pub request_id: RequestId,
pub tenant_id: TenantId,
pub vshard_id: VShardId,
pub plan: PhysicalPlan,
pub deadline: Instant,
pub priority: Priority,
pub trace_id: u64,
pub consistency: ReadConsistency,
pub idempotency_key: Option<u64>,
}
#[derive(Debug, Clone)]
pub struct Response {
pub request_id: RequestId,
pub status: Status,
pub attempt: u32,
pub partial: bool,
pub payload: Payload,
pub watermark_lsn: Lsn,
pub error_code: Option<ErrorCode>,
}
pub use super::physical_plan::PhysicalPlan;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum Priority {
Background = 0,
Normal = 1,
High = 2,
Critical = 3,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Status {
Ok,
Partial,
Error,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ErrorCode {
DeadlineExceeded,
RejectedConstraint { constraint: String },
RejectedPrevalidation { reason: String },
NotFound,
RejectedAuthz,
ConflictRetry,
FanOutExceeded,
ResourcesExhausted,
RejectedDanglingEdge { missing_node: String },
DuplicateWrite,
Internal { detail: String },
}
impl From<crate::Error> for ErrorCode {
fn from(e: crate::Error) -> Self {
match e {
crate::Error::DeadlineExceeded { .. } => Self::DeadlineExceeded,
crate::Error::RejectedConstraint { constraint, .. } => {
Self::RejectedConstraint { constraint }
}
crate::Error::RejectedPrevalidation { reason, .. } => {
Self::RejectedPrevalidation { reason }
}
crate::Error::CollectionNotFound { .. } | crate::Error::DocumentNotFound { .. } => {
Self::NotFound
}
crate::Error::RejectedAuthz { .. } => Self::RejectedAuthz,
crate::Error::ConflictRetry { .. } => Self::ConflictRetry,
crate::Error::FanOutExceeded { .. } => Self::FanOutExceeded,
crate::Error::MemoryExhausted { .. } => Self::ResourcesExhausted,
other => Self::Internal {
detail: other.to_string(),
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bridge::physical_plan::{DocumentOp, MetaOp};
use std::time::Duration;
fn sample_request() -> Request {
Request {
request_id: RequestId::new(1),
tenant_id: TenantId::new(1),
vshard_id: VShardId::new(0),
plan: PhysicalPlan::Document(DocumentOp::PointGet {
collection: "users".into(),
document_id: "doc-1".into(),
rls_filters: Vec::new(),
}),
deadline: Instant::now() + Duration::from_secs(5),
priority: Priority::Normal,
trace_id: 0xABCD,
consistency: ReadConsistency::Strong,
idempotency_key: None,
}
}
#[test]
fn request_fields_accessible() {
let req = sample_request();
assert_eq!(req.request_id, RequestId::new(1));
assert_eq!(req.tenant_id, TenantId::new(1));
assert_eq!(req.trace_id, 0xABCD);
}
#[test]
fn response_ok() {
let resp = Response {
request_id: RequestId::new(1),
status: Status::Ok,
attempt: 1,
partial: false,
payload: Payload::from_arc(Arc::from(b"result".as_slice())),
watermark_lsn: Lsn::new(42),
error_code: None,
};
assert_eq!(resp.status, Status::Ok);
assert_eq!(resp.watermark_lsn, Lsn::new(42));
assert_eq!(&*resp.payload, b"result");
}
#[test]
fn response_error() {
let resp = Response {
request_id: RequestId::new(2),
status: Status::Error,
attempt: 1,
partial: false,
payload: Payload::empty(),
watermark_lsn: Lsn::ZERO,
error_code: Some(ErrorCode::DeadlineExceeded),
};
assert_eq!(resp.error_code, Some(ErrorCode::DeadlineExceeded));
}
#[test]
fn priority_ordering() {
assert!(Priority::Background < Priority::Normal);
assert!(Priority::Normal < Priority::High);
assert!(Priority::High < Priority::Critical);
}
#[test]
fn cancel_plan() {
let req = Request {
request_id: RequestId::new(99),
tenant_id: TenantId::new(1),
vshard_id: VShardId::new(0),
plan: PhysicalPlan::Meta(MetaOp::Cancel {
target_request_id: RequestId::new(42),
}),
deadline: Instant::now() + Duration::from_secs(1),
priority: Priority::Critical,
trace_id: 0,
consistency: ReadConsistency::Eventual,
idempotency_key: None,
};
match req.plan {
PhysicalPlan::Meta(MetaOp::Cancel { target_request_id }) => {
assert_eq!(target_request_id, RequestId::new(42));
}
_ => panic!("expected Cancel plan"),
}
}
}