use std::collections::HashMap;
use crate::{point::Point, precision::Precision};
#[derive(Debug, Clone)]
pub struct WriteOptions {
pub precision: Precision,
pub default_tags: HashMap<String, String>,
pub gzip_threshold: Option<usize>,
pub no_sync: bool,
pub accept_partial: bool,
pub use_v2_api: bool,
pub tag_order: Vec<String>,
pub batch_size: usize,
pub max_inflight: usize,
}
pub const DEFAULT_BATCH_SIZE: usize = 5_000;
pub const DEFAULT_MAX_INFLIGHT: usize = 4;
impl Default for WriteOptions {
fn default() -> Self {
WriteOptions {
precision: Precision::Nanosecond,
default_tags: HashMap::new(),
gzip_threshold: Some(1024),
no_sync: false,
accept_partial: true,
use_v2_api: false,
tag_order: Vec::new(),
batch_size: DEFAULT_BATCH_SIZE,
max_inflight: DEFAULT_MAX_INFLIGHT,
}
}
}
pub trait WriteInput {
fn into_lp_batches(
self,
opts: &WriteOptions,
) -> Box<dyn Iterator<Item = crate::Result<Vec<u8>>> + Send>;
}
impl WriteInput for &str {
fn into_lp_batches(
self,
_opts: &WriteOptions,
) -> Box<dyn Iterator<Item = crate::Result<Vec<u8>>> + Send> {
if self.is_empty() {
Box::new(std::iter::empty())
} else {
Box::new(std::iter::once(Ok(self.as_bytes().to_vec())))
}
}
}
impl WriteInput for String {
fn into_lp_batches(
self,
_opts: &WriteOptions,
) -> Box<dyn Iterator<Item = crate::Result<Vec<u8>>> + Send> {
if self.is_empty() {
Box::new(std::iter::empty())
} else {
Box::new(std::iter::once(Ok(self.into_bytes())))
}
}
}
pub(crate) struct PointBatchIter {
points: Vec<Point>,
idx: usize,
batch_size: usize,
precision: Precision,
default_tags: HashMap<String, String>,
tag_order: Vec<String>,
}
impl Iterator for PointBatchIter {
type Item = crate::Result<Vec<u8>>;
fn next(&mut self) -> Option<Self::Item> {
if self.idx >= self.points.len() {
return None;
}
let end = (self.idx + self.batch_size).min(self.points.len());
let mut buf = Vec::with_capacity((end - self.idx) * 64);
let tag_order = if self.tag_order.is_empty() {
None
} else {
Some(self.tag_order.as_slice())
};
let mut key_scratch = Vec::new();
for point in &self.points[self.idx..end] {
if let Err(e) = point.write_line_protocol(
&mut buf,
self.precision,
&self.default_tags,
tag_order,
&mut key_scratch,
) {
self.idx = self.points.len(); return Some(Err(e));
}
buf.push(b'\n');
}
if buf.last() == Some(&b'\n') {
buf.pop();
}
self.idx = end;
Some(Ok(buf))
}
}
impl WriteInput for Vec<Point> {
fn into_lp_batches(
self,
opts: &WriteOptions,
) -> Box<dyn Iterator<Item = crate::Result<Vec<u8>>> + Send> {
Box::new(PointBatchIter {
points: self,
idx: 0,
batch_size: opts.batch_size.max(1),
precision: opts.precision,
default_tags: opts.default_tags.clone(),
tag_order: opts.tag_order.clone(),
})
}
}