use osproxy_core::{ClusterId, Target, TraceContext};
use osproxy_spi::{HttpMethod, Protocol};
use crate::error::SinkError;
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct ReadOp {
pub target: Target,
pub id: String,
pub routing: Option<String>,
pub protocol: Protocol,
pub trace: Option<TraceContext>,
pub forward_headers: Vec<(String, String)>,
}
impl ReadOp {
#[must_use]
pub fn new(target: Target, id: impl Into<String>, routing: Option<String>) -> Self {
Self {
target,
id: id.into(),
routing,
protocol: Protocol::Http1,
trace: None,
forward_headers: Vec::new(),
}
}
#[must_use]
pub fn with_protocol(mut self, protocol: Protocol) -> Self {
self.protocol = protocol;
self
}
#[must_use]
pub fn with_trace(mut self, trace: Option<TraceContext>) -> Self {
self.trace = trace;
self
}
#[must_use]
pub fn with_forward_headers(mut self, headers: Vec<(String, String)>) -> Self {
self.forward_headers = headers;
self
}
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct ReadOutcome {
pub status: u16,
pub found: bool,
pub body: Vec<u8>,
pub pool_reuse: bool,
}
impl ReadOutcome {
#[must_use]
pub fn found(status: u16, body: Vec<u8>) -> Self {
Self {
status,
found: true,
body,
pool_reuse: false,
}
}
#[must_use]
pub fn not_found(status: u16, body: Vec<u8>) -> Self {
Self {
status,
found: false,
body,
pool_reuse: false,
}
}
#[must_use]
pub fn with_pool_reuse(mut self, reused: bool) -> Self {
self.pool_reuse = reused;
self
}
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct SearchOp {
pub target: Target,
pub body: Vec<u8>,
pub protocol: Protocol,
pub query: Option<String>,
pub trace: Option<TraceContext>,
pub forward_headers: Vec<(String, String)>,
}
impl SearchOp {
#[must_use]
pub fn new(target: Target, body: Vec<u8>) -> Self {
Self {
target,
body,
protocol: Protocol::Http1,
query: None,
trace: None,
forward_headers: Vec::new(),
}
}
#[must_use]
pub fn with_protocol(mut self, protocol: Protocol) -> Self {
self.protocol = protocol;
self
}
#[must_use]
pub fn with_query(mut self, query: Option<String>) -> Self {
self.query = query;
self
}
#[must_use]
pub fn with_trace(mut self, trace: Option<TraceContext>) -> Self {
self.trace = trace;
self
}
#[must_use]
pub fn with_forward_headers(mut self, headers: Vec<(String, String)>) -> Self {
self.forward_headers = headers;
self
}
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct SearchOutcome {
pub status: u16,
pub body: Vec<u8>,
pub pool_reuse: bool,
}
impl SearchOutcome {
#[must_use]
pub fn new(status: u16, body: Vec<u8>) -> Self {
Self {
status,
body,
pool_reuse: false,
}
}
#[must_use]
pub fn with_pool_reuse(mut self, reused: bool) -> Self {
self.pool_reuse = reused;
self
}
}
#[derive(Clone, Copy, PartialEq, Eq, Debug)]
pub struct CountOutcome {
pub status: u16,
pub count: u64,
pub pool_reuse: bool,
}
impl CountOutcome {
#[must_use]
pub fn new(status: u16, count: u64) -> Self {
Self {
status,
count,
pool_reuse: false,
}
}
#[must_use]
pub fn with_pool_reuse(mut self, reused: bool) -> Self {
self.pool_reuse = reused;
self
}
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct CursorOp {
pub cluster: ClusterId,
pub method: HttpMethod,
pub path: String,
pub body: Vec<u8>,
pub query: Option<String>,
pub endpoint: Option<String>,
pub protocol: Protocol,
pub trace: Option<TraceContext>,
pub forward_headers: Vec<(String, String)>,
}
impl CursorOp {
#[must_use]
pub fn new(
cluster: ClusterId,
method: HttpMethod,
path: impl Into<String>,
body: Vec<u8>,
) -> Self {
Self {
cluster,
method,
path: path.into(),
body,
query: None,
endpoint: None,
protocol: Protocol::Http1,
trace: None,
forward_headers: Vec::new(),
}
}
#[must_use]
pub fn with_forward_headers(mut self, headers: Vec<(String, String)>) -> Self {
self.forward_headers = headers;
self
}
#[must_use]
pub fn with_endpoint(mut self, endpoint: Option<String>) -> Self {
self.endpoint = endpoint;
self
}
#[must_use]
pub fn with_protocol(mut self, protocol: Protocol) -> Self {
self.protocol = protocol;
self
}
#[must_use]
pub fn with_query(mut self, query: Option<String>) -> Self {
self.query = query;
self
}
#[must_use]
pub fn with_trace(mut self, trace: Option<TraceContext>) -> Self {
self.trace = trace;
self
}
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct ForwardOp {
pub cluster: ClusterId,
pub method: HttpMethod,
pub path: String,
pub query: Option<String>,
pub endpoint: Option<String>,
pub protocol: Protocol,
pub trace: Option<TraceContext>,
pub forward_headers: Vec<(String, String)>,
}
impl ForwardOp {
#[must_use]
pub fn new(cluster: ClusterId, method: HttpMethod, path: impl Into<String>) -> Self {
Self {
cluster,
method,
path: path.into(),
query: None,
endpoint: None,
protocol: Protocol::Http1,
trace: None,
forward_headers: Vec::new(),
}
}
#[must_use]
pub fn with_forward_headers(mut self, headers: Vec<(String, String)>) -> Self {
self.forward_headers = headers;
self
}
#[must_use]
pub fn with_endpoint(mut self, endpoint: Option<String>) -> Self {
self.endpoint = endpoint;
self
}
#[must_use]
pub fn with_query(mut self, query: Option<String>) -> Self {
self.query = query;
self
}
#[must_use]
pub fn with_protocol(mut self, protocol: Protocol) -> Self {
self.protocol = protocol;
self
}
#[must_use]
pub fn with_trace(mut self, trace: Option<TraceContext>) -> Self {
self.trace = trace;
self
}
}
pub struct StreamingForward {
pub status: u16,
pub body: crate::ByteBody,
pub content_type: Option<String>,
pub pool_reuse: bool,
}
impl std::fmt::Debug for StreamingForward {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamingForward")
.field("status", &self.status)
.field("pool_reuse", &self.pool_reuse)
.finish_non_exhaustive()
}
}
pub struct StreamingSearch {
pub status: u16,
pub body: crate::ByteBody,
pub pool_reuse: bool,
}
impl std::fmt::Debug for StreamingSearch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StreamingSearch")
.field("status", &self.status)
.field("pool_reuse", &self.pool_reuse)
.finish_non_exhaustive()
}
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct CursorOutcome {
pub status: u16,
pub body: Vec<u8>,
pub content_type: Option<String>,
pub pool_reuse: bool,
}
impl CursorOutcome {
#[must_use]
pub fn new(status: u16, body: Vec<u8>) -> Self {
Self {
status,
body,
content_type: None,
pool_reuse: false,
}
}
#[must_use]
pub fn with_pool_reuse(mut self, reused: bool) -> Self {
self.pool_reuse = reused;
self
}
#[must_use]
pub fn with_content_type(mut self, content_type: Option<String>) -> Self {
self.content_type = content_type;
self
}
}
pub trait Reader: Send + Sync {
fn get(
&self,
op: ReadOp,
) -> impl std::future::Future<Output = Result<ReadOutcome, SinkError>> + Send;
fn search(
&self,
op: SearchOp,
) -> impl std::future::Future<Output = Result<SearchOutcome, SinkError>> + Send;
fn count(
&self,
op: SearchOp,
) -> impl std::future::Future<Output = Result<CountOutcome, SinkError>> + Send;
fn cursor(
&self,
_op: CursorOp,
) -> impl std::future::Future<Output = Result<CursorOutcome, SinkError>> + Send {
async {
Err(SinkError::Transport {
kind: "cursor passthrough not supported by this sink",
})
}
}
fn search_stream(
&self,
_op: SearchOp,
) -> impl std::future::Future<Output = Result<StreamingSearch, SinkError>> + Send {
async {
Err(SinkError::Transport {
kind: "streaming search not supported by this sink",
})
}
}
fn forward_stream(
&self,
_op: ForwardOp,
_body: crate::opensearch::ByteBody,
) -> impl std::future::Future<Output = Result<StreamingForward, SinkError>> + Send {
async {
Err(SinkError::Transport {
kind: "streaming forward not supported by this sink",
})
}
}
}