#[cfg(feature = "axum")]
pub mod extract;
pub mod json;
pub mod proto;
pub mod s2s;
pub mod sse;
use std::time::Duration;
use futures::stream::BoxStream;
use itertools::Itertools as _;
use s2_common::{
encryption::EncryptionSpec,
record,
types::{
self,
stream::{StreamName, StreamNamePrefix, StreamNameStartAfter},
},
};
use serde::{Deserialize, Serialize};
use time::OffsetDateTime;
use super::config::StreamConfig;
use crate::{data::Format, mime::JsonOrProto};
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct StreamInfo {
pub name: StreamName,
#[serde(with = "time::serde::rfc3339")]
pub created_at: OffsetDateTime,
#[serde(with = "time::serde::rfc3339::option")]
pub deleted_at: Option<OffsetDateTime>,
}
impl From<types::stream::StreamInfo> for StreamInfo {
fn from(value: types::stream::StreamInfo) -> Self {
Self {
name: value.name,
created_at: value.created_at,
deleted_at: value.deleted_at,
}
}
}
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
pub struct ListStreamsRequest {
#[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
pub prefix: Option<StreamNamePrefix>,
#[cfg_attr(feature = "utoipa", param(value_type = String, default = "", required = false))]
pub start_after: Option<StreamNameStartAfter>,
#[cfg_attr(feature = "utoipa", param(value_type = usize, maximum = 1000, default = 1000, required = false))]
pub limit: Option<usize>,
}
super::impl_list_request_conversions!(
ListStreamsRequest,
types::stream::StreamNamePrefix,
types::stream::StreamNameStartAfter
);
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct ListStreamsResponse {
#[cfg_attr(feature = "utoipa", schema(max_items = 1000))]
pub streams: Vec<StreamInfo>,
pub has_more: bool,
}
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct CreateStreamRequest {
pub stream: StreamName,
pub config: Option<StreamConfig>,
}
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct StreamPosition {
pub seq_num: record::SeqNum,
pub timestamp: record::Timestamp,
}
impl From<record::StreamPosition> for StreamPosition {
fn from(pos: record::StreamPosition) -> Self {
Self {
seq_num: pos.seq_num,
timestamp: pos.timestamp,
}
}
}
impl From<StreamPosition> for record::StreamPosition {
fn from(pos: StreamPosition) -> Self {
Self {
seq_num: pos.seq_num,
timestamp: pos.timestamp,
}
}
}
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct TailResponse {
pub tail: StreamPosition,
}
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
pub struct ReadStart {
#[cfg_attr(feature = "utoipa", param(value_type = record::SeqNum, required = false))]
pub seq_num: Option<record::SeqNum>,
#[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
pub timestamp: Option<record::Timestamp>,
#[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
pub tail_offset: Option<u64>,
#[cfg_attr(feature = "utoipa", param(value_type = bool, required = false))]
pub clamp: Option<bool>,
}
impl TryFrom<ReadStart> for types::stream::ReadStart {
type Error = types::ValidationError;
fn try_from(value: ReadStart) -> Result<Self, Self::Error> {
let from = match (value.seq_num, value.timestamp, value.tail_offset) {
(Some(seq_num), None, None) => types::stream::ReadFrom::SeqNum(seq_num),
(None, Some(timestamp), None) => types::stream::ReadFrom::Timestamp(timestamp),
(None, None, Some(tail_offset)) => types::stream::ReadFrom::TailOffset(tail_offset),
(None, None, None) => types::stream::ReadFrom::TailOffset(0),
_ => {
return Err(types::ValidationError(
"only one of seq_num, timestamp, or tail_offset can be provided".to_owned(),
));
}
};
let clamp = value.clamp.unwrap_or(false);
Ok(Self { from, clamp })
}
}
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::IntoParams))]
#[cfg_attr(feature = "utoipa", into_params(parameter_in = Query))]
pub struct ReadEnd {
#[cfg_attr(feature = "utoipa", param(value_type = u64, required = false))]
pub count: Option<usize>,
#[cfg_attr(feature = "utoipa", param(value_type = usize, required = false))]
pub bytes: Option<usize>,
#[cfg_attr(feature = "utoipa", param(value_type = record::Timestamp, required = false))]
pub until: Option<record::Timestamp>,
#[cfg_attr(feature = "utoipa", param(value_type = u32, required = false))]
pub wait: Option<u32>,
}
impl From<ReadEnd> for types::stream::ReadEnd {
fn from(value: ReadEnd) -> Self {
Self {
limit: s2_common::read_extent::ReadLimit::from_count_and_bytes(
value.count,
value.bytes,
),
until: value.until.into(),
wait: value.wait.map(|w| Duration::from_secs(w as u64)),
}
}
}
#[derive(Debug, Clone)]
pub enum ReadRequest {
Unary {
encryption: EncryptionSpec,
format: Format,
response_mime: JsonOrProto,
},
EventStream {
encryption: EncryptionSpec,
format: Format,
last_event_id: Option<sse::LastEventId>,
},
S2s {
encryption: EncryptionSpec,
response_compression: s2s::CompressionAlgorithm,
},
}
pub enum AppendRequest {
Unary {
encryption: EncryptionSpec,
input: types::stream::AppendInput,
response_mime: JsonOrProto,
},
S2s {
encryption: EncryptionSpec,
inputs: BoxStream<'static, Result<types::stream::AppendInput, AppendInputStreamError>>,
response_compression: s2s::CompressionAlgorithm,
},
}
impl std::fmt::Debug for AppendRequest {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
AppendRequest::Unary {
encryption,
input,
response_mime: response,
} => f
.debug_struct("AppendRequest::Unary")
.field("encryption", encryption)
.field("input", input)
.field("response", response)
.finish(),
AppendRequest::S2s {
encryption,
response_compression,
..
} => f
.debug_struct("AppendRequest::S2s")
.field("encryption", encryption)
.field("response_compression", response_compression)
.finish(),
}
}
}
#[derive(Debug, thiserror::Error)]
pub enum AppendInputStreamError {
#[error("Failed to decode S2S frame: {0}")]
FrameDecode(#[from] std::io::Error),
#[error(transparent)]
Validation(#[from] types::ValidationError),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct Header(pub String, pub String);
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct SequencedRecord {
pub seq_num: record::SeqNum,
pub timestamp: record::Timestamp,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
#[cfg_attr(feature = "utoipa", schema(required = false))]
pub headers: Vec<Header>,
#[serde(default, skip_serializing_if = "String::is_empty")]
#[cfg_attr(feature = "utoipa", schema(required = false))]
pub body: String,
}
impl SequencedRecord {
pub fn encode(format: Format, record: record::SequencedRecord) -> Self {
let (record::StreamPosition { seq_num, timestamp }, record) = record.into_parts();
let (headers, body) = record.into_parts();
Self {
seq_num,
timestamp,
headers: headers
.into_iter()
.map(|h| Header(format.encode(&h.name), format.encode(&h.value)))
.collect(),
body: format.encode(&body),
}
}
}
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct AppendRecord {
pub timestamp: Option<record::Timestamp>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
#[cfg_attr(feature = "utoipa", schema(required = false))]
pub headers: Vec<Header>,
#[serde(default, skip_serializing_if = "String::is_empty")]
#[cfg_attr(feature = "utoipa", schema(required = false))]
pub body: String,
}
impl AppendRecord {
pub fn decode(
self,
format: Format,
) -> Result<types::stream::AppendRecord, types::ValidationError> {
let headers = self
.headers
.into_iter()
.map(|Header(name, value)| {
Ok::<record::Header, types::ValidationError>(record::Header {
name: format.decode(name)?,
value: format.decode(value)?,
})
})
.try_collect()?;
let body = format.decode(self.body)?;
let record = record::Record::try_from_parts(headers, body)
.map_err(|e| e.to_string())?
.into();
let parts = types::stream::AppendRecordParts {
timestamp: self.timestamp,
record,
};
types::stream::AppendRecord::try_from(parts)
.map_err(|e| types::ValidationError(e.to_string()))
}
}
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct AppendInput {
pub records: Vec<AppendRecord>,
pub match_seq_num: Option<record::SeqNum>,
pub fencing_token: Option<record::FencingToken>,
}
impl AppendInput {
pub fn decode(
self,
format: Format,
) -> Result<types::stream::AppendInput, types::ValidationError> {
let records: Vec<types::stream::AppendRecord> = self
.records
.into_iter()
.map(|record| record.decode(format))
.try_collect()?;
Ok(types::stream::AppendInput {
records: types::stream::AppendRecordBatch::try_from(records)?,
match_seq_num: self.match_seq_num,
fencing_token: self.fencing_token,
})
}
}
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct AppendAck {
pub start: StreamPosition,
pub end: StreamPosition,
pub tail: StreamPosition,
}
impl From<types::stream::AppendAck> for AppendAck {
fn from(ack: types::stream::AppendAck) -> Self {
Self {
start: ack.start.into(),
end: ack.end.into(),
tail: ack.tail.into(),
}
}
}
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize, Deserialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
#[serde(rename_all = "snake_case")]
pub enum AppendConditionFailed {
#[cfg_attr(feature = "utoipa", schema(title = "fencing token"))]
FencingTokenMismatch(record::FencingToken),
#[cfg_attr(feature = "utoipa", schema(title = "seq num"))]
SeqNumMismatch(record::SeqNum),
}
#[rustfmt::skip]
#[derive(Debug, Clone, Serialize)]
#[cfg_attr(feature = "utoipa", derive(utoipa::ToSchema))]
pub struct ReadBatch {
pub records: Vec<SequencedRecord>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tail: Option<StreamPosition>,
}
impl ReadBatch {
pub fn encode(format: Format, batch: types::stream::ReadBatch) -> Self {
Self {
records: batch
.records
.into_iter()
.map(|record| SequencedRecord::encode(format, record))
.collect(),
tail: batch.tail.map(Into::into),
}
}
}