use std::ops::Deref;
use std::sync::Arc;
use std::time::Instant;
#[derive(Debug, Clone)]
pub enum Payload {
Heap(Arc<[u8]>),
}
impl Payload {
pub fn from_vec(v: Vec<u8>) -> Self {
Self::Heap(Arc::from(v.into_boxed_slice()))
}
pub fn empty() -> Self {
Self::Heap(Arc::from([].as_slice()))
}
pub fn as_bytes(&self) -> &[u8] {
match self {
Self::Heap(a) => a,
}
}
pub fn is_empty(&self) -> bool {
self.as_bytes().is_empty()
}
pub fn len(&self) -> usize {
self.as_bytes().len()
}
pub fn to_vec(&self) -> Vec<u8> {
self.as_bytes().to_vec()
}
}
impl Deref for Payload {
type Target = [u8];
fn deref(&self) -> &[u8] {
self.as_bytes()
}
}
impl AsRef<[u8]> for Payload {
fn as_ref(&self) -> &[u8] {
self.as_bytes()
}
}
impl From<Vec<u8>> for Payload {
fn from(v: Vec<u8>) -> Self {
Self::from_vec(v)
}
}
impl From<Arc<[u8]>> for Payload {
fn from(a: Arc<[u8]>) -> Self {
Self::Heap(a)
}
}
use crate::event::types::EventSource;
use crate::types::{DatabaseId, Lsn, ReadConsistency, RequestId, TenantId, TraceId, VShardId};
#[derive(Debug, Clone)]
pub struct Request {
pub request_id: RequestId,
pub tenant_id: TenantId,
pub database_id: DatabaseId,
pub vshard_id: VShardId,
pub plan: PhysicalPlan,
pub deadline: Instant,
pub priority: Priority,
pub trace_id: TraceId,
pub consistency: ReadConsistency,
pub idempotency_key: Option<u64>,
pub event_source: EventSource,
pub user_roles: Vec<String>,
pub user_id: Option<Arc<str>>,
pub statement_digest: Option<Arc<str>>,
}
#[derive(Debug, Clone)]
pub struct Response {
pub request_id: RequestId,
pub status: Status,
pub attempt: u32,
pub partial: bool,
pub payload: Payload,
pub watermark_lsn: Lsn,
pub error_code: Option<ErrorCode>,
}
pub use super::physical_plan::PhysicalPlan;
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
pub enum Priority {
Background = 0,
Normal = 1,
High = 2,
Critical = 3,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum Status {
Ok,
Partial,
Error,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ErrorCode {
DeadlineExceeded,
RejectedConstraint { constraint: String, detail: String },
RejectedPrevalidation { reason: String },
NotFound,
RejectedAuthz,
ConflictRetry,
FanOutExceeded,
ResourcesExhausted,
RejectedDanglingEdge { missing_node: String },
DuplicateWrite,
AppendOnlyViolation { collection: String },
BalanceViolation { collection: String, detail: String },
PeriodLocked { collection: String },
RetentionViolation { collection: String },
LegalHoldActive { collection: String },
StateTransitionViolation { collection: String, detail: String },
TransitionCheckViolation { collection: String },
TypeGuardViolation { collection: String, detail: String },
TypeMismatch { collection: String, detail: String },
OverflowError { collection: String },
InsufficientBalance { collection: String, detail: String },
RateExceeded { gate: String, retry_after_ms: u64 },
CollectionDraining { collection: String },
RecursionDepthExceeded { cte_name: String, max_depth: usize },
Internal { detail: String },
Unsupported { detail: String },
RollbackFailed { entry_index: usize, detail: String },
OllpRetryRequired,
}
impl From<crate::Error> for ErrorCode {
fn from(e: crate::Error) -> Self {
match e {
crate::Error::DeadlineExceeded { .. } => Self::DeadlineExceeded,
crate::Error::RejectedConstraint {
constraint, detail, ..
} => Self::RejectedConstraint { constraint, detail },
crate::Error::RejectedPrevalidation { reason, .. } => {
Self::RejectedPrevalidation { reason }
}
crate::Error::CollectionNotFound { .. } | crate::Error::DocumentNotFound { .. } => {
Self::NotFound
}
crate::Error::RejectedAuthz { .. } => Self::RejectedAuthz,
crate::Error::ConflictRetry { .. } => Self::ConflictRetry,
crate::Error::FanOutExceeded { .. } => Self::FanOutExceeded,
crate::Error::MemoryExhausted { .. } => Self::ResourcesExhausted,
crate::Error::Backpressure { .. } => Self::ResourcesExhausted,
crate::Error::AppendOnlyViolation { collection, .. } => {
Self::AppendOnlyViolation { collection }
}
crate::Error::BalanceViolation {
collection, detail, ..
} => Self::BalanceViolation { collection, detail },
crate::Error::PeriodLocked { collection, .. } => Self::PeriodLocked { collection },
crate::Error::RetentionViolation { collection, .. } => {
Self::RetentionViolation { collection }
}
crate::Error::LegalHoldActive { collection, .. } => {
Self::LegalHoldActive { collection }
}
crate::Error::StateTransitionViolation {
collection, detail, ..
} => Self::StateTransitionViolation { collection, detail },
crate::Error::TransitionCheckViolation { collection, .. } => {
Self::TransitionCheckViolation { collection }
}
crate::Error::TypeGuardViolation {
collection, detail, ..
} => Self::TypeGuardViolation { collection, detail },
crate::Error::TypeMismatch {
collection, detail, ..
} => Self::TypeMismatch { collection, detail },
crate::Error::OverflowError { collection, .. } => Self::OverflowError { collection },
crate::Error::InsufficientBalance {
collection, detail, ..
} => Self::InsufficientBalance { collection, detail },
crate::Error::RateExceeded {
gate,
retry_after_ms,
..
} => Self::RateExceeded {
gate,
retry_after_ms,
},
other => Self::Internal {
detail: other.to_string(),
},
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::bridge::physical_plan::{DocumentOp, MetaOp};
use std::time::Duration;
fn sample_request() -> Request {
Request {
request_id: RequestId::new(1),
tenant_id: TenantId::new(1),
database_id: DatabaseId::DEFAULT,
vshard_id: VShardId::new(0),
plan: PhysicalPlan::Document(DocumentOp::PointGet {
collection: "users".into(),
document_id: "doc-1".into(),
surrogate: nodedb_types::Surrogate::ZERO,
pk_bytes: Vec::new(),
rls_filters: Vec::new(),
system_as_of_ms: None,
valid_at_ms: None,
}),
deadline: Instant::now() + Duration::from_secs(5),
priority: Priority::Normal,
trace_id: TraceId::generate(),
consistency: ReadConsistency::Strong,
idempotency_key: None,
event_source: crate::event::EventSource::User,
user_roles: Vec::new(),
user_id: None,
statement_digest: None,
}
}
#[test]
fn request_fields_accessible() {
let req = sample_request();
assert_eq!(req.request_id, RequestId::new(1));
assert_eq!(req.tenant_id, TenantId::new(1));
assert_ne!(req.trace_id, TraceId::ZERO);
}
#[test]
fn response_ok() {
let resp = Response {
request_id: RequestId::new(1),
status: Status::Ok,
attempt: 1,
partial: false,
payload: Payload::from_vec(b"result".to_vec()),
watermark_lsn: Lsn::new(42),
error_code: None,
};
assert_eq!(resp.status, Status::Ok);
assert_eq!(resp.watermark_lsn, Lsn::new(42));
assert_eq!(&*resp.payload, b"result");
}
#[test]
fn response_error() {
let resp = Response {
request_id: RequestId::new(2),
status: Status::Error,
attempt: 1,
partial: false,
payload: Payload::empty(),
watermark_lsn: Lsn::ZERO,
error_code: Some(ErrorCode::DeadlineExceeded),
};
assert_eq!(resp.error_code, Some(ErrorCode::DeadlineExceeded));
}
#[test]
fn priority_ordering() {
assert!(Priority::Background < Priority::Normal);
assert!(Priority::Normal < Priority::High);
assert!(Priority::High < Priority::Critical);
}
#[test]
fn cancel_plan() {
let req = Request {
request_id: RequestId::new(99),
tenant_id: TenantId::new(1),
database_id: DatabaseId::DEFAULT,
vshard_id: VShardId::new(0),
plan: PhysicalPlan::Meta(MetaOp::Cancel {
target_request_id: RequestId::new(42),
}),
deadline: Instant::now() + Duration::from_secs(1),
priority: Priority::Critical,
trace_id: TraceId::ZERO,
consistency: ReadConsistency::Eventual,
idempotency_key: None,
event_source: crate::event::EventSource::User,
user_roles: Vec::new(),
user_id: None,
statement_digest: None,
};
match req.plan {
PhysicalPlan::Meta(MetaOp::Cancel { target_request_id }) => {
assert_eq!(target_request_id, RequestId::new(42));
}
_ => panic!("expected Cancel plan"),
}
}
}