streamstore 0.21.1

DEPRECATED: Use s2-sdk instead
Documentation
use prost_types::method_options::IdempotencyLevel;
use tonic::{IntoRequest, codec::CompressionEncoding, transport::Channel};
use tonic_side_effect::{FrameSignal, RequestFrameMonitor};

use super::{
    ClientError, ServiceRequest, ServiceStreamingRequest, ServiceStreamingResponse,
    StreamingRequest, StreamingResponse,
};
use crate::{
    api::{self, stream_service_client::StreamServiceClient},
    client::AppendRetryPolicy,
    types::{self, StreamPosition},
};

#[derive(Debug, Clone)]
pub struct CheckTailServiceRequest {
    client: StreamServiceClient<Channel>,
    stream: String,
}

impl CheckTailServiceRequest {
    pub fn new(client: StreamServiceClient<Channel>, stream: impl Into<String>) -> Self {
        Self {
            client,
            stream: stream.into(),
        }
    }
}

impl ServiceRequest for CheckTailServiceRequest {
    type ApiRequest = api::CheckTailRequest;
    type Response = StreamPosition;
    type ApiResponse = api::CheckTailResponse;
    const IDEMPOTENCY_LEVEL: IdempotencyLevel = IdempotencyLevel::NoSideEffects;

    fn prepare_request(&mut self) -> Result<tonic::Request<Self::ApiRequest>, types::ConvertError> {
        let req = api::CheckTailRequest {
            stream: self.stream.clone(),
        };
        Ok(req.into_request())
    }

    async fn send(
        &mut self,
        req: tonic::Request<Self::ApiRequest>,
    ) -> Result<tonic::Response<Self::ApiResponse>, tonic::Status> {
        self.client.check_tail(req).await
    }

    fn parse_response(
        &self,
        resp: tonic::Response<Self::ApiResponse>,
    ) -> Result<Self::Response, types::ConvertError> {
        Ok(resp.into_inner().into())
    }
}

#[derive(Debug, Clone)]
pub struct ReadServiceRequest {
    client: StreamServiceClient<Channel>,
    stream: String,
    req: types::ReadRequest,
}

impl ReadServiceRequest {
    pub fn new(
        mut client: StreamServiceClient<Channel>,
        stream: impl Into<String>,
        req: types::ReadRequest,
        compression: bool,
    ) -> Self {
        if compression {
            client = client.accept_compressed(CompressionEncoding::Zstd);
        }
        Self {
            client,
            stream: stream.into(),
            req,
        }
    }
}

impl ServiceRequest for ReadServiceRequest {
    type ApiRequest = api::ReadRequest;
    type Response = types::ReadOutput;
    type ApiResponse = api::ReadResponse;
    const IDEMPOTENCY_LEVEL: IdempotencyLevel = IdempotencyLevel::NoSideEffects;

    fn prepare_request(&mut self) -> Result<tonic::Request<Self::ApiRequest>, types::ConvertError> {
        let req = self.req.clone().try_into_api_type(self.stream.clone())?;
        Ok(req.into_request())
    }

    async fn send(
        &mut self,
        req: tonic::Request<Self::ApiRequest>,
    ) -> Result<tonic::Response<Self::ApiResponse>, tonic::Status> {
        self.client.read(req).await
    }

    fn parse_response(
        &self,
        resp: tonic::Response<Self::ApiResponse>,
    ) -> Result<Self::Response, types::ConvertError> {
        resp.into_inner().try_into()
    }
}

#[derive(Debug, Clone)]
pub struct ReadSessionServiceRequest {
    client: StreamServiceClient<Channel>,
    stream: String,
    req: types::ReadSessionRequest,
}

impl ReadSessionServiceRequest {
    pub fn new(
        mut client: StreamServiceClient<Channel>,
        stream: impl Into<String>,
        req: types::ReadSessionRequest,
        compression: bool,
    ) -> Self {
        if compression {
            client = client.accept_compressed(CompressionEncoding::Zstd);
        }
        Self {
            client,
            stream: stream.into(),
            req,
        }
    }

    pub(crate) fn req_mut(&mut self) -> &mut types::ReadSessionRequest {
        &mut self.req
    }
}

impl ServiceRequest for ReadSessionServiceRequest {
    type ApiRequest = api::ReadSessionRequest;
    type Response = ServiceStreamingResponse<ReadSessionStreamingResponse>;
    type ApiResponse = tonic::Streaming<api::ReadSessionResponse>;
    const IDEMPOTENCY_LEVEL: IdempotencyLevel = IdempotencyLevel::NoSideEffects;

    fn prepare_request(&mut self) -> Result<tonic::Request<Self::ApiRequest>, types::ConvertError> {
        let req = self.req.clone().into_api_type(self.stream.clone());
        Ok(req.into_request())
    }

    async fn send(
        &mut self,
        req: tonic::Request<Self::ApiRequest>,
    ) -> Result<tonic::Response<Self::ApiResponse>, tonic::Status> {
        self.client.read_session(req).await
    }

    fn parse_response(
        &self,
        resp: tonic::Response<Self::ApiResponse>,
    ) -> Result<Self::Response, types::ConvertError> {
        Ok(ServiceStreamingResponse::new(
            ReadSessionStreamingResponse,
            resp.into_inner(),
        ))
    }
}

pub struct ReadSessionStreamingResponse;

impl StreamingResponse for ReadSessionStreamingResponse {
    type ResponseItem = types::ReadOutput;
    type ApiResponseItem = api::ReadSessionResponse;

    fn parse_response_item(
        &self,
        resp: Self::ApiResponseItem,
    ) -> Result<Self::ResponseItem, ClientError> {
        resp.try_into().map_err(Into::into)
    }
}

#[derive(Debug, Clone)]
pub struct AppendServiceRequest {
    client: StreamServiceClient<RequestFrameMonitor>,
    append_retry_policy: AppendRetryPolicy,
    frame_signal: FrameSignal,
    stream: String,
    req: types::AppendInput,
}

impl AppendServiceRequest {
    pub fn new(
        mut client: StreamServiceClient<RequestFrameMonitor>,
        append_retry_policy: AppendRetryPolicy,
        frame_signal: FrameSignal,
        stream: impl Into<String>,
        req: types::AppendInput,
        compression: bool,
    ) -> Self {
        if compression {
            client = client.send_compressed(CompressionEncoding::Zstd);
        }
        Self {
            client,
            append_retry_policy,
            frame_signal,
            stream: stream.into(),
            req,
        }
    }
}

impl ServiceRequest for AppendServiceRequest {
    type ApiRequest = api::AppendRequest;
    type Response = types::AppendAck;
    type ApiResponse = api::AppendResponse;
    const IDEMPOTENCY_LEVEL: IdempotencyLevel = IdempotencyLevel::IdempotencyUnknown;

    fn prepare_request(&mut self) -> Result<tonic::Request<Self::ApiRequest>, types::ConvertError> {
        Ok(api::AppendRequest {
            input: Some(self.req.clone().into_api_type(self.stream.clone())),
        }
        .into_request())
    }

    async fn send(
        &mut self,
        req: tonic::Request<Self::ApiRequest>,
    ) -> Result<tonic::Response<Self::ApiResponse>, tonic::Status> {
        self.client.append(req).await
    }

    fn parse_response(
        &self,
        resp: tonic::Response<Self::ApiResponse>,
    ) -> Result<Self::Response, types::ConvertError> {
        resp.into_inner().try_into()
    }

    fn should_retry(&self, err: &ClientError) -> bool {
        if let ClientError::Service(status) = err {
            let retryable_error = matches!(
                status.code(),
                tonic::Code::Unavailable
                    | tonic::Code::DeadlineExceeded
                    | tonic::Code::Cancelled
                    | tonic::Code::Unknown
                    | tonic::Code::ResourceExhausted
            );
            let policy_compliant = match self.append_retry_policy {
                AppendRetryPolicy::All => true,
                AppendRetryPolicy::NoSideEffects => !self.frame_signal.is_signalled(),
            };
            retryable_error && policy_compliant
        } else {
            false
        }
    }
}

#[derive(Debug, Clone)]
pub struct AppendSessionServiceRequest<S>
where
    S: Send + futures::Stream<Item = types::AppendInput> + Unpin,
{
    client: StreamServiceClient<RequestFrameMonitor>,
    stream: String,
    req: Option<S>,
}

impl<S> AppendSessionServiceRequest<S>
where
    S: Send + futures::Stream<Item = types::AppendInput> + Unpin,
{
    pub fn new(
        mut client: StreamServiceClient<RequestFrameMonitor>,
        stream: impl Into<String>,
        req: S,
        compression: bool,
    ) -> Self {
        if compression {
            client = client.send_compressed(CompressionEncoding::Zstd);
        }
        Self {
            client,
            stream: stream.into(),
            req: Some(req),
        }
    }
}

impl<S: std::fmt::Debug> ServiceRequest for AppendSessionServiceRequest<S>
where
    S: 'static + Send + futures::Stream<Item = types::AppendInput> + Unpin,
{
    type ApiRequest = ServiceStreamingRequest<AppendSessionStreamingRequest, S>;
    type Response = ServiceStreamingResponse<AppendSessionStreamingResponse>;
    type ApiResponse = tonic::Streaming<api::AppendSessionResponse>;

    const IDEMPOTENCY_LEVEL: IdempotencyLevel = IdempotencyLevel::IdempotencyUnknown;

    fn prepare_request(&mut self) -> Result<tonic::Request<Self::ApiRequest>, types::ConvertError> {
        let req = ServiceStreamingRequest::new(
            AppendSessionStreamingRequest::new(&self.stream),
            self.req.take().ok_or("missing streaming append request")?,
        );
        Ok(req.into_request())
    }

    async fn send(
        &mut self,
        req: tonic::Request<Self::ApiRequest>,
    ) -> Result<tonic::Response<Self::ApiResponse>, tonic::Status> {
        self.client.append_session(req).await
    }

    fn parse_response(
        &self,
        resp: tonic::Response<Self::ApiResponse>,
    ) -> Result<Self::Response, types::ConvertError> {
        Ok(ServiceStreamingResponse::new(
            AppendSessionStreamingResponse,
            resp.into_inner(),
        ))
    }
}

pub struct AppendSessionStreamingRequest {
    stream: String,
}

impl AppendSessionStreamingRequest {
    fn new(stream: impl Into<String>) -> Self {
        Self {
            stream: stream.into(),
        }
    }
}

impl StreamingRequest for AppendSessionStreamingRequest {
    type RequestItem = types::AppendInput;
    type ApiRequestItem = api::AppendSessionRequest;

    fn prepare_request_item(&self, req: Self::RequestItem) -> Self::ApiRequestItem {
        api::AppendSessionRequest {
            input: Some(req.into_api_type(&self.stream)),
        }
    }
}

pub struct AppendSessionStreamingResponse;

impl StreamingResponse for AppendSessionStreamingResponse {
    type ResponseItem = types::AppendAck;
    type ApiResponseItem = api::AppendSessionResponse;

    fn parse_response_item(
        &self,
        resp: Self::ApiResponseItem,
    ) -> Result<Self::ResponseItem, ClientError> {
        resp.try_into().map_err(Into::into)
    }
}