use osproxy_core::{Epoch, Target, TraceContext};
use osproxy_spi::Protocol;
#[derive(Clone, PartialEq, Eq, Debug)]
pub struct WriteOp {
pub target: Target,
pub doc: DocOp,
pub epoch: Epoch,
pub protocol: Protocol,
pub trace: Option<TraceContext>,
pub forward_headers: Vec<(String, String)>,
}
impl WriteOp {
#[must_use]
pub fn new(target: Target, doc: DocOp, epoch: Epoch) -> Self {
Self {
target,
doc,
epoch,
protocol: Protocol::Http1,
trace: None,
forward_headers: Vec::new(),
}
}
#[must_use]
pub fn with_protocol(mut self, protocol: Protocol) -> Self {
self.protocol = protocol;
self
}
#[must_use]
pub fn with_trace(mut self, trace: Option<TraceContext>) -> Self {
self.trace = trace;
self
}
#[must_use]
pub fn with_forward_headers(mut self, headers: Vec<(String, String)>) -> Self {
self.forward_headers = headers;
self
}
}
#[derive(Clone, PartialEq, Eq, Debug)]
pub enum DocOp {
Index {
id: Option<String>,
routing: Option<String>,
body: bytes::Bytes,
},
Create {
id: Option<String>,
routing: Option<String>,
body: bytes::Bytes,
},
Update {
id: String,
routing: Option<String>,
body: bytes::Bytes,
},
Delete {
id: String,
routing: Option<String>,
},
}
#[derive(Clone, PartialEq, Eq, Debug, Default)]
pub struct WriteBatch {
ops: Vec<WriteOp>,
}
impl WriteBatch {
#[must_use]
pub fn new() -> Self {
Self::default()
}
#[must_use]
pub fn single(op: WriteOp) -> Self {
Self { ops: vec![op] }
}
#[must_use]
pub fn with(mut self, op: WriteOp) -> Self {
self.ops.push(op);
self
}
#[must_use]
pub fn with_trace(mut self, trace: Option<&TraceContext>) -> Self {
for op in &mut self.ops {
op.trace = trace.cloned();
}
self
}
#[must_use]
pub fn with_forward_headers(mut self, headers: &[(String, String)]) -> Self {
for op in &mut self.ops {
op.forward_headers = headers.to_vec();
}
self
}
#[must_use]
pub fn ops(&self) -> &[WriteOp] {
&self.ops
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.ops.is_empty()
}
#[must_use]
pub fn len(&self) -> usize {
self.ops.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
use osproxy_core::{ClusterId, IndexName};
fn op(id: &str) -> WriteOp {
WriteOp::new(
Target::new(ClusterId::from("c"), IndexName::from("i")),
DocOp::Index {
id: Some(id.to_owned()),
routing: Some("p".to_owned()),
body: bytes::Bytes::from_static(b"{}"),
},
Epoch::new(1),
)
}
#[test]
fn single_batch_has_one_op() {
let b = WriteBatch::single(op("x"));
assert_eq!(b.len(), 1);
assert!(!b.is_empty());
assert_eq!(b.ops()[0].epoch, Epoch::new(1));
}
#[test]
fn empty_and_builder() {
let b = WriteBatch::new();
assert!(b.is_empty());
let b = b.with(op("a")).with(op("b"));
assert_eq!(b.len(), 2);
}
}