use chrono::Utc;
use sift_error::prelude::*;
use sift_pbfs::chunk::PbfsChunk;
use sift_rs::ingest::v1::IngestWithConfigDataStreamRequest;
use std::path::PathBuf;
use tokio::fs::File;
use tokio::io::{AsyncWriteExt, BufWriter};
const BACKUP_FILE_BUFFER_WRITE_SIZE: usize = 128 * 1024;
#[derive(Clone)]
pub(crate) struct FileWriterConfig {
pub(crate) directory: PathBuf,
pub(crate) prefix: String,
pub(crate) max_size: usize,
}
#[derive(Debug, Default, Clone)]
pub(crate) struct FileContext {
pub(crate) message_count: usize,
#[allow(dead_code)]
pub(crate) file_path: PathBuf,
pub(crate) file_size: usize,
}
pub(crate) struct FileWriter {
pub(crate) config: FileWriterConfig,
pub(crate) current_file_ctx: FileContext,
pub(crate) current_file: Option<BufWriter<File>>,
pub(crate) chunk_encode_buffer: Vec<u8>,
}
impl FileWriter {
pub(crate) fn new(config: FileWriterConfig) -> Self {
Self {
config,
current_file_ctx: FileContext::default(),
current_file: None,
chunk_encode_buffer: Vec::new(),
}
}
pub(crate) async fn write_request(
&mut self,
request: &IngestWithConfigDataStreamRequest,
) -> Result<()> {
if self.current_file.is_none() {
let (path, file) = Self::create_file(&self.config).await?;
self.current_file_ctx = FileContext {
message_count: 0,
file_path: path.clone(),
file_size: 0,
};
self.current_file = Some(BufWriter::with_capacity(
BACKUP_FILE_BUFFER_WRITE_SIZE,
file,
));
}
let Some(file) = self.current_file.as_mut() else {
return Err(Error::new_msg(
ErrorKind::BackupsError,
"current file is not set",
));
};
let encoded_chunk = PbfsChunk::encode_into(
core::slice::from_ref(request),
&mut self.chunk_encode_buffer,
)?;
file.write_all(encoded_chunk).await?;
self.current_file_ctx.message_count += 1;
self.current_file_ctx.file_size += encoded_chunk.len();
Ok(())
}
pub(crate) fn should_rotate_file(&self) -> bool {
self.current_file_ctx.file_size >= self.config.max_size
}
pub(crate) async fn rotate_file(&mut self) -> Result<Option<FileContext>> {
match self.current_file.take() {
Some(mut writer) => {
writer
.flush()
.await
.map_err(|e| Error::new(ErrorKind::BackupsError, e))?;
let file = writer.into_inner();
if let Err(e) = file.sync_all().await {
#[cfg(feature = "tracing")]
tracing::warn!("unable to sync file, data may be lost: {e:?}");
}
let ctx = self.current_file_ctx.clone();
self.current_file_ctx = FileContext::default();
Ok(Some(ctx))
}
None => Ok(None),
}
}
pub(crate) async fn flush(&mut self) -> Result<()> {
if let Some(writer) = self.current_file.as_mut() {
writer
.flush()
.await
.map_err(|e| Error::new(ErrorKind::BackupsError, e))?;
}
Ok(())
}
pub(crate) async fn sync(&mut self) -> Result<()> {
if let Some(writer) = self.current_file.take() {
let file = writer.into_inner();
file.sync_all()
.await
.map_err(|e| Error::new(ErrorKind::BackupsError, e))?;
self.current_file = Some(BufWriter::with_capacity(
BACKUP_FILE_BUFFER_WRITE_SIZE,
file,
));
}
Ok(())
}
async fn create_file(config: &FileWriterConfig) -> Result<(PathBuf, File)> {
let file_path = config.directory.join(format!(
"{}-{}",
config.prefix,
Utc::now().timestamp_micros()
));
let file = File::create(&file_path)
.await
.map_err(|e| Error::new(ErrorKind::BackupsError, e))
.context("failed to generate file")
.help("please contact Sift")?;
Ok((file_path, file))
}
}
#[cfg(test)]
mod tests {
use super::*;
use sift_rs::ingest::v1::IngestWithConfigDataStreamRequest;
use tempdir::TempDir;
use uuid::Uuid;
fn create_test_request(flow: &str) -> IngestWithConfigDataStreamRequest {
IngestWithConfigDataStreamRequest {
ingestion_config_id: Uuid::new_v4().to_string(),
flow: flow.to_string(),
timestamp: None,
channel_values: vec![],
run_id: Uuid::new_v4().to_string(),
end_stream_on_validation_error: false,
organization_id: Uuid::new_v4().to_string(),
}
}
#[tokio::test]
async fn test_file_writer_new() {
let temp_dir = TempDir::new("test_file_writer").unwrap();
let config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "test".to_string(),
max_size: 1024,
};
let writer = FileWriter::new(config);
assert_eq!(writer.current_file_ctx.message_count, 0);
assert_eq!(writer.current_file_ctx.file_size, 0);
assert!(!writer.should_rotate_file());
}
#[tokio::test]
async fn test_file_writer_write_request_creates_file() {
let temp_dir = TempDir::new("test_file_writer").unwrap();
let config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "test".to_string(),
max_size: 1024 * 1024, };
let mut writer = FileWriter::new(config);
let request = create_test_request("test_flow");
assert!(writer.current_file.is_none());
assert!(writer.write_request(&request).await.is_ok());
assert!(writer.current_file.is_some());
assert_eq!(writer.current_file_ctx.message_count, 1);
assert!(writer.current_file_ctx.file_size > 0);
assert!(writer.current_file_ctx.file_path.exists());
}
#[tokio::test]
async fn test_file_writer_write_multiple_requests() {
let temp_dir = TempDir::new("test_file_writer").unwrap();
let config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "test".to_string(),
max_size: 1024 * 1024, };
let mut writer = FileWriter::new(config);
for i in 0..5 {
let request = create_test_request(&format!("flow_{}", i));
assert!(writer.write_request(&request).await.is_ok());
}
assert_eq!(writer.current_file_ctx.message_count, 5);
assert!(writer.current_file_ctx.file_size > 0);
}
#[tokio::test]
async fn test_file_writer_should_rotate_file() {
let temp_dir = TempDir::new("test_file_writer").unwrap();
let config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "test".to_string(),
max_size: 100, };
let mut writer = FileWriter::new(config);
let request = create_test_request("test_flow");
assert!(writer.write_request(&request).await.is_ok());
let should_rotate = writer.should_rotate_file();
if !should_rotate {
for _ in 0..100 {
assert!(writer.write_request(&request).await.is_ok());
if writer.should_rotate_file() {
break;
}
}
}
assert!(writer.should_rotate_file());
}
#[tokio::test]
async fn test_file_writer_rotate_file() {
let temp_dir = TempDir::new("test_file_writer").unwrap();
let config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "test".to_string(),
max_size: 100, };
let max_size = config.max_size;
let mut writer = FileWriter::new(config);
let request = create_test_request("test_flow");
for _ in 0..100 {
assert!(writer.write_request(&request).await.is_ok());
if writer.should_rotate_file() {
break;
}
}
let rotated_ctx = writer.rotate_file().await.expect("failed to rotate file");
if writer.current_file_ctx.file_size >= max_size {
assert!(rotated_ctx.is_some());
let ctx = rotated_ctx.unwrap();
assert!(ctx.message_count > 0);
assert!(ctx.file_size > 0);
assert!(ctx.file_path.exists());
assert_eq!(writer.current_file_ctx.message_count, 0);
assert_eq!(writer.current_file_ctx.file_size, 0);
}
}
#[tokio::test]
async fn test_file_writer_rotate_file_no_file() {
let temp_dir = TempDir::new("test_file_writer").unwrap();
let config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "test".to_string(),
max_size: 1024,
};
let mut writer = FileWriter::new(config);
let result = writer.rotate_file().await.expect("failed to rotate file");
assert!(result.is_none());
}
#[tokio::test]
async fn test_file_writer_flush() {
let temp_dir = TempDir::new("test_file_writer").unwrap();
let config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "test".to_string(),
max_size: 1024,
};
let mut writer = FileWriter::new(config);
let request = create_test_request("test_flow");
assert!(writer.flush().await.is_ok());
assert!(writer.write_request(&request).await.is_ok());
assert!(writer.flush().await.is_ok());
}
#[tokio::test]
async fn test_file_writer_sync() {
let temp_dir = TempDir::new("test_file_writer").unwrap();
let config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "test".to_string(),
max_size: 1024,
};
let mut writer = FileWriter::new(config);
assert!(writer.sync().await.is_ok());
let request = create_test_request("test_flow");
assert!(writer.write_request(&request).await.is_ok());
assert!(writer.sync().await.is_ok());
assert!(writer.current_file.is_some());
}
#[tokio::test]
async fn test_file_writer_current_file_context() {
let temp_dir = TempDir::new("test_file_writer").unwrap();
let config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "test".to_string(),
max_size: 1024,
};
let writer = FileWriter::new(config);
assert_eq!(writer.current_file_ctx.message_count, 0);
assert_eq!(writer.current_file_ctx.file_size, 0);
}
#[tokio::test]
async fn test_file_writer_file_naming() {
let temp_dir = TempDir::new("test_file_writer").unwrap();
let config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "backup".to_string(),
max_size: 1024,
};
let mut writer = FileWriter::new(config);
let request = create_test_request("test_flow");
assert!(writer.write_request(&request).await.is_ok());
let file_path = writer.current_file_ctx.file_path.clone();
let file_name = file_path.file_name().unwrap().to_string_lossy();
assert!(file_name.starts_with("backup-"));
}
#[tokio::test]
async fn test_file_writer_multiple_rotations() {
let temp_dir = TempDir::new("test_file_writer").unwrap();
let config = FileWriterConfig {
directory: temp_dir.path().to_path_buf(),
prefix: "test".to_string(),
max_size: 100, };
let mut writer = FileWriter::new(config);
let request = create_test_request("test_flow");
let mut rotated_files = Vec::new();
for _ in 0..3 {
for _ in 0..100 {
assert!(writer.write_request(&request).await.is_ok());
if writer.should_rotate_file() {
break;
}
}
if writer.should_rotate_file() {
if let Some(ctx) = writer.rotate_file().await.expect("failed to rotate file") {
rotated_files.push(ctx.file_path);
}
}
}
if !rotated_files.is_empty() {
assert!(rotated_files.len() > 0);
for file_path in &rotated_files {
assert!(file_path.exists());
}
}
}
}