use std::path::PathBuf;
use std::sync::Arc;
use arrow_schema::SchemaRef;
use vgi_rpc::Result;
use crate::arguments::Arguments;
use crate::function::{ArgSpec, BindParams, BindResponse, FunctionMetadata};
use crate::settings::Settings;
use crate::table_function::TableProducer;
pub struct BufferingStore {
base: PathBuf,
}
fn hex(b: &[u8]) -> String {
let mut s = String::with_capacity(b.len() * 2);
for x in b {
s.push_str(&format!("{x:02x}"));
}
s
}
impl Default for BufferingStore {
fn default() -> Self {
BufferingStore::new()
}
}
impl BufferingStore {
pub fn new() -> Self {
let mut base = std::env::temp_dir();
base.push("vgi-rust-buffering");
let _ = std::fs::create_dir_all(&base);
BufferingStore { base }
}
fn log_dir(&self, exec: &[u8], ns: &[u8], key: &[u8]) -> PathBuf {
let mut p = self.base.clone();
p.push(hex(exec));
p.push(format!("{}__{}", hex(ns), hex(key)));
p
}
pub fn append(&self, exec: &[u8], ns: &[u8], key: &[u8], value: Vec<u8>) -> i64 {
use std::io::Write;
let dir = self.log_dir(exec, ns, key);
let _ = std::fs::create_dir_all(&dir);
let mut id = self.max_id(&dir) + 1;
loop {
let path = dir.join(format!("{id:020}.bin"));
match std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&path)
{
Ok(mut f) => {
let _ = f.write_all(&value);
return id;
}
Err(e) if e.kind() == std::io::ErrorKind::AlreadyExists => {
id += 1;
}
Err(_) => return id,
}
}
}
fn max_id(&self, dir: &PathBuf) -> i64 {
std::fs::read_dir(dir)
.into_iter()
.flatten()
.flatten()
.filter_map(|e| {
e.file_name()
.to_str()
.and_then(|n| n.strip_suffix(".bin"))
.and_then(|n| n.parse::<i64>().ok())
})
.max()
.unwrap_or(-1)
}
pub fn scan(
&self,
exec: &[u8],
ns: &[u8],
key: &[u8],
after_id: i64,
limit: usize,
) -> Vec<(i64, Vec<u8>)> {
let dir = self.log_dir(exec, ns, key);
let mut ids: Vec<i64> = std::fs::read_dir(&dir)
.into_iter()
.flatten()
.flatten()
.filter_map(|e| {
e.file_name()
.to_str()
.and_then(|n| n.strip_suffix(".bin"))
.and_then(|n| n.parse::<i64>().ok())
})
.filter(|id| *id > after_id)
.collect();
ids.sort_unstable();
ids.into_iter()
.take(limit)
.filter_map(|id| {
std::fs::read(dir.join(format!("{id:020}.bin")))
.ok()
.map(|v| (id, v))
})
.collect()
}
pub fn clear(&self, exec: &[u8]) {
let mut p = self.base.clone();
p.push(hex(exec));
let _ = std::fs::remove_dir_all(&p);
}
fn kv_path(&self, exec: &[u8], key: &[u8]) -> PathBuf {
let mut p = self.base.clone();
p.push(hex(exec));
let _ = std::fs::create_dir_all(&p);
p.push(format!("kv_{}", hex(key)));
p
}
pub fn kv_put(&self, exec: &[u8], key: &[u8], value: &[u8]) {
let path = self.kv_path(exec, key);
let tmp = path.with_extension("tmp");
if std::fs::write(&tmp, value).is_ok() {
let _ = std::fs::rename(&tmp, &path);
}
}
pub fn kv_get(&self, exec: &[u8], key: &[u8]) -> Option<Vec<u8>> {
std::fs::read(self.kv_path(exec, key)).ok()
}
pub fn kv_del(&self, exec: &[u8], key: &[u8]) {
let _ = std::fs::remove_file(self.kv_path(exec, key));
}
fn queue_dir(&self, exec: &[u8]) -> PathBuf {
let mut p = self.base.clone();
p.push(hex(exec));
p.push("__queue__");
p
}
pub fn queue_push(&self, exec: &[u8], items: &[Vec<u8>]) {
use std::io::Write;
let dir = self.queue_dir(exec);
let _ = std::fs::create_dir_all(&dir);
let mut id = self.max_id(&dir) + 1;
for item in items {
let path = dir.join(format!("{id:020}.bin"));
if let Ok(mut f) = std::fs::OpenOptions::new()
.write(true)
.create_new(true)
.open(&path)
{
let _ = f.write_all(item);
}
id += 1;
}
}
pub fn queue_pop(&self, exec: &[u8], claim_tag: &str) -> Option<Vec<u8>> {
let dir = self.queue_dir(exec);
let mut ids: Vec<i64> = std::fs::read_dir(&dir)
.into_iter()
.flatten()
.flatten()
.filter_map(|e| {
e.file_name()
.to_str()
.and_then(|n| n.strip_suffix(".bin"))
.and_then(|n| n.parse::<i64>().ok())
})
.collect();
ids.sort_unstable();
for id in ids {
let src = dir.join(format!("{id:020}.bin"));
let claimed = dir.join(format!("claimed_{claim_tag}_{id:020}.bin"));
if std::fs::rename(&src, &claimed).is_ok() {
let data = std::fs::read(&claimed).ok();
let _ = std::fs::remove_file(&claimed);
if data.is_some() {
return data;
}
}
}
None
}
}
pub struct BufferingParams {
pub execution_id: Vec<u8>,
pub storage: Arc<BufferingStore>,
pub output_schema: SchemaRef,
pub arguments: Arguments,
pub settings: Settings,
pub attach_opaque_data: Option<Vec<u8>>,
pub batch_index: Option<i64>,
pub logs: Arc<std::sync::Mutex<Vec<String>>>,
}
impl BufferingParams {
pub fn log(&self, message: impl Into<String>) {
if let Ok(mut g) = self.logs.lock() {
g.push(message.into());
}
}
}
pub trait TableBufferingFunction: Send + Sync {
fn name(&self) -> &str;
fn metadata(&self) -> FunctionMetadata;
fn argument_specs(&self) -> Vec<ArgSpec>;
fn on_bind(&self, params: &BindParams) -> Result<BindResponse>;
fn process(
&self,
params: &BufferingParams,
batch: &arrow_array::RecordBatch,
) -> Result<Vec<u8>>;
fn combine(&self, params: &BufferingParams, state_ids: &[Vec<u8>]) -> Result<Vec<Vec<u8>>>;
fn finalize_producer(
&self,
params: &BufferingParams,
finalize_state_id: Vec<u8>,
) -> Result<Box<dyn TableProducer>>;
}