#[cfg(test)]
use std::sync::Arc;
use bytes::Bytes;
use serde::{Deserialize, Serialize};
use ursula_shard::{BucketStreamId, ShardPlacement};
use ursula_stream::{
ColdChunkRef, ExternalPayloadRef, ProducerRequest, StreamReadPlan, StreamReadSegment,
};
use crate::cold_store::{ColdStoreHandle, DEFAULT_CONTENT_TYPE};
use crate::engine::GroupEngineError;
use crate::engine_in_memory::InMemoryGroupEngine;
use crate::error::RuntimeError;
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CreateStreamRequest {
pub stream_id: BucketStreamId,
pub content_type: String,
pub content_type_explicit: bool,
pub initial_payload: Bytes,
pub close_after: bool,
pub stream_seq: Option<String>,
pub producer: Option<ProducerRequest>,
pub stream_ttl_seconds: Option<u64>,
pub stream_expires_at_ms: Option<u64>,
pub forked_from: Option<BucketStreamId>,
pub fork_offset: Option<u64>,
pub now_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CreateStreamExternalRequest {
pub stream_id: BucketStreamId,
pub content_type: String,
pub initial_payload: ExternalPayloadRef,
pub close_after: bool,
pub stream_seq: Option<String>,
pub producer: Option<ProducerRequest>,
pub stream_ttl_seconds: Option<u64>,
pub stream_expires_at_ms: Option<u64>,
pub forked_from: Option<BucketStreamId>,
pub fork_offset: Option<u64>,
pub now_ms: u64,
}
impl CreateStreamExternalRequest {
pub fn from_create_request(
request: CreateStreamRequest,
initial_payload: ExternalPayloadRef,
) -> Self {
Self {
stream_id: request.stream_id,
content_type: request.content_type,
initial_payload,
close_after: request.close_after,
stream_seq: request.stream_seq,
producer: request.producer,
stream_ttl_seconds: request.stream_ttl_seconds,
stream_expires_at_ms: request.stream_expires_at_ms,
forked_from: request.forked_from,
fork_offset: request.fork_offset,
now_ms: request.now_ms,
}
}
}
impl CreateStreamRequest {
pub fn new(stream_id: BucketStreamId, content_type: impl Into<String>) -> Self {
Self {
stream_id,
content_type: content_type.into(),
content_type_explicit: true,
initial_payload: Bytes::new(),
close_after: false,
stream_seq: None,
producer: None,
stream_ttl_seconds: None,
stream_expires_at_ms: None,
forked_from: None,
fork_offset: None,
now_ms: 0,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CreateStreamResponse {
pub placement: ShardPlacement,
pub next_offset: u64,
pub closed: bool,
pub already_exists: bool,
pub group_commit_index: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HeadStreamRequest {
pub stream_id: BucketStreamId,
pub now_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct HeadStreamResponse {
pub placement: ShardPlacement,
pub content_type: String,
pub tail_offset: u64,
pub closed: bool,
pub stream_ttl_seconds: Option<u64>,
pub stream_expires_at_ms: Option<u64>,
pub snapshot_offset: Option<u64>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReadStreamRequest {
pub stream_id: BucketStreamId,
pub offset: u64,
pub max_len: usize,
pub now_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReadStreamResponse {
pub placement: ShardPlacement,
pub offset: u64,
pub next_offset: u64,
pub content_type: String,
pub payload: Vec<u8>,
pub up_to_date: bool,
pub closed: bool,
}
pub enum GroupReadStreamBody {
Materialized(Vec<u8>),
Planned {
stream_id: BucketStreamId,
plan: StreamReadPlan,
cold_store: Option<ColdStoreHandle>,
},
#[cfg(test)]
Blocking {
entered: Arc<tokio::sync::Notify>,
release: Arc<tokio::sync::Notify>,
payload: Vec<u8>,
},
}
pub struct GroupReadStreamParts {
pub placement: ShardPlacement,
pub offset: u64,
pub next_offset: u64,
pub content_type: String,
pub up_to_date: bool,
pub closed: bool,
pub body: GroupReadStreamBody,
}
impl GroupReadStreamParts {
pub fn from_response(response: ReadStreamResponse) -> Self {
Self {
placement: response.placement,
offset: response.offset,
next_offset: response.next_offset,
content_type: response.content_type,
up_to_date: response.up_to_date,
closed: response.closed,
body: GroupReadStreamBody::Materialized(response.payload),
}
}
pub fn from_plan(
placement: ShardPlacement,
stream_id: BucketStreamId,
plan: StreamReadPlan,
cold_store: Option<ColdStoreHandle>,
) -> Self {
Self {
placement,
offset: plan.offset,
next_offset: plan.next_offset,
content_type: plan.content_type.clone(),
up_to_date: plan.up_to_date,
closed: plan.closed,
body: GroupReadStreamBody::Planned {
stream_id,
plan,
cold_store,
},
}
}
pub async fn into_response(self) -> Result<ReadStreamResponse, GroupEngineError> {
let payload = match &self.body {
GroupReadStreamBody::Materialized(payload) => payload.clone(),
GroupReadStreamBody::Planned {
stream_id,
plan,
cold_store,
} => {
InMemoryGroupEngine::read_payload_from_plan(cold_store.as_ref(), stream_id, plan)
.await?
}
#[cfg(test)]
GroupReadStreamBody::Blocking {
entered,
release,
payload,
} => {
entered.notify_one();
release.notified().await;
payload.clone()
}
};
Ok(ReadStreamResponse {
placement: self.placement,
offset: self.offset,
next_offset: self.next_offset,
content_type: self.content_type,
payload,
up_to_date: self.up_to_date,
closed: self.closed,
})
}
pub fn payload_is_empty(&self) -> bool {
match &self.body {
GroupReadStreamBody::Materialized(payload) => payload.is_empty(),
GroupReadStreamBody::Planned { plan, .. } => {
plan.segments.iter().all(|segment| match segment {
StreamReadSegment::Hot(payload) => payload.is_empty(),
StreamReadSegment::Object(segment) => segment.len == 0,
})
}
#[cfg(test)]
GroupReadStreamBody::Blocking { payload, .. } => payload.is_empty(),
}
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PublishSnapshotRequest {
pub stream_id: BucketStreamId,
pub snapshot_offset: u64,
pub content_type: String,
pub payload: Bytes,
pub now_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct PublishSnapshotResponse {
pub placement: ShardPlacement,
pub snapshot_offset: u64,
pub group_commit_index: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReadSnapshotRequest {
pub stream_id: BucketStreamId,
pub snapshot_offset: Option<u64>,
pub now_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReadSnapshotResponse {
pub placement: ShardPlacement,
pub snapshot_offset: u64,
pub next_offset: u64,
pub content_type: String,
pub payload: Vec<u8>,
pub up_to_date: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeleteSnapshotRequest {
pub stream_id: BucketStreamId,
pub snapshot_offset: u64,
pub now_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BootstrapStreamRequest {
pub stream_id: BucketStreamId,
pub now_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BootstrapUpdate {
pub start_offset: u64,
pub next_offset: u64,
pub content_type: String,
pub payload: Vec<u8>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct BootstrapStreamResponse {
pub placement: ShardPlacement,
pub snapshot_offset: Option<u64>,
pub snapshot_content_type: String,
pub snapshot_payload: Vec<u8>,
pub updates: Vec<BootstrapUpdate>,
pub next_offset: u64,
pub up_to_date: bool,
pub closed: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct CloseStreamRequest {
pub stream_id: BucketStreamId,
pub stream_seq: Option<String>,
pub producer: Option<ProducerRequest>,
pub now_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CloseStreamResponse {
pub placement: ShardPlacement,
pub next_offset: u64,
pub group_commit_index: u64,
pub deduplicated: bool,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct DeleteStreamRequest {
pub stream_id: BucketStreamId,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct DeleteStreamResponse {
pub placement: ShardPlacement,
pub group_commit_index: u64,
pub hard_deleted: bool,
pub parent_to_release: Option<BucketStreamId>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct ForkRefResponse {
pub placement: ShardPlacement,
pub fork_ref_count: u64,
pub hard_deleted: bool,
pub parent_to_release: Option<BucketStreamId>,
pub group_commit_index: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct FlushColdRequest {
pub stream_id: BucketStreamId,
pub chunk: ColdChunkRef,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct FlushColdResponse {
pub placement: ShardPlacement,
pub hot_start_offset: u64,
pub group_commit_index: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct TouchStreamAccessResponse {
pub placement: ShardPlacement,
pub changed: bool,
pub expired: bool,
pub group_commit_index: u64,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PlanColdFlushRequest {
pub stream_id: BucketStreamId,
pub min_hot_bytes: usize,
pub max_flush_bytes: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct PlanGroupColdFlushRequest {
pub min_hot_bytes: usize,
pub max_flush_bytes: usize,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ColdHotBacklog {
pub stream_id: BucketStreamId,
pub stream_hot_bytes: u64,
pub group_hot_bytes: u64,
}
#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
pub struct ColdWriteAdmission {
pub max_hot_bytes_per_group: Option<u64>,
}
impl ColdWriteAdmission {
pub(crate) fn is_enabled(self) -> bool {
self.max_hot_bytes_per_group.is_some()
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AppendRequest {
pub stream_id: BucketStreamId,
pub content_type: String,
pub payload: Bytes,
pub close_after: bool,
pub stream_seq: Option<String>,
pub producer: Option<ProducerRequest>,
pub now_ms: u64,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AppendExternalRequest {
pub stream_id: BucketStreamId,
pub content_type: String,
pub payload: ExternalPayloadRef,
pub close_after: bool,
pub stream_seq: Option<String>,
pub producer: Option<ProducerRequest>,
pub now_ms: u64,
}
impl AppendExternalRequest {
pub fn from_append_request(request: AppendRequest, payload: ExternalPayloadRef) -> Self {
Self {
stream_id: request.stream_id,
content_type: request.content_type,
payload,
close_after: request.close_after,
stream_seq: request.stream_seq,
producer: request.producer,
now_ms: request.now_ms,
}
}
}
impl AppendRequest {
pub fn new(stream_id: BucketStreamId, payload_len: u64) -> Self {
Self {
stream_id,
content_type: DEFAULT_CONTENT_TYPE.to_owned(),
payload: Bytes::from(vec![
0;
usize::try_from(payload_len)
.expect("payload_len fits usize")
]),
close_after: false,
stream_seq: None,
producer: None,
now_ms: 0,
}
}
pub fn from_bytes(stream_id: BucketStreamId, payload: impl Into<Bytes>) -> Self {
Self {
stream_id,
content_type: DEFAULT_CONTENT_TYPE.to_owned(),
payload: payload.into(),
close_after: false,
stream_seq: None,
producer: None,
now_ms: 0,
}
}
pub fn payload_len(&self) -> u64 {
u64::try_from(self.payload.len()).expect("payload len fits u64")
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AppendBatchRequest {
pub stream_id: BucketStreamId,
pub content_type: String,
pub payloads: Vec<Bytes>,
pub producer: Option<ProducerRequest>,
pub now_ms: u64,
}
impl AppendBatchRequest {
pub fn new<P>(stream_id: BucketStreamId, payloads: Vec<P>) -> Self
where
P: Into<Bytes>,
{
Self {
stream_id,
content_type: DEFAULT_CONTENT_TYPE.to_owned(),
payloads: payloads.into_iter().map(Into::into).collect(),
producer: None,
now_ms: 0,
}
}
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct AppendResponse {
pub placement: ShardPlacement,
pub start_offset: u64,
pub next_offset: u64,
pub stream_append_count: u64,
pub group_commit_index: u64,
pub closed: bool,
pub deduplicated: bool,
pub producer: Option<ProducerRequest>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct AppendBatchResponse {
pub placement: ShardPlacement,
pub items: Vec<Result<AppendResponse, RuntimeError>>,
}
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct StreamAppendCount {
pub stream_id: BucketStreamId,
pub append_count: u64,
}