use std::time::Instant;
use tokio_util::sync::CancellationToken;
use tracing::{debug, info};
use crate::error::Result;
use super::progress::ProgressTracker;
use super::streaming::StreamingQuery;
use super::writers::FormatWriter;
#[derive(Debug)]
pub struct ExportResult {
pub documents_exported: u64,
pub file_size_bytes: u64,
pub elapsed_ms: u64,
pub cancelled: bool,
}
pub struct ExportCoordinator {
query: Box<dyn StreamingQuery>,
tracker: ProgressTracker,
writer: Box<dyn FormatWriter>,
cancel_token: Option<CancellationToken>,
}
impl ExportCoordinator {
pub fn new(
query: Box<dyn StreamingQuery>,
tracker: ProgressTracker,
writer: Box<dyn FormatWriter>,
) -> Self {
Self {
query,
tracker,
writer,
cancel_token: None,
}
}
pub fn with_cancellation(mut self, token: CancellationToken) -> Self {
self.cancel_token = Some(token);
self
}
pub async fn execute(&mut self) -> Result<ExportResult> {
let start_time = Instant::now();
info!("Starting export operation");
let mut exported = 0u64;
let mut batch_count = 0u32;
loop {
if let Some(ref token) = self.cancel_token {
if token.is_cancelled() {
info!("Export operation cancelled by user");
let _ = self.writer.finalize().await;
let _ = self.query.close().await;
self.tracker.finish();
let elapsed_ms = start_time.elapsed().as_millis() as u64;
let file_size_bytes = self.writer.file_size().await.unwrap_or(0);
return Ok(ExportResult {
documents_exported: exported,
file_size_bytes,
elapsed_ms,
cancelled: true,
});
}
}
debug!("Fetching batch #{}", batch_count + 1);
match self.query.next_batch().await? {
Some(docs) => {
let count = docs.len();
debug!("Received batch of {} documents", count);
self.writer.write_batch(&docs).await?;
exported += count as u64;
self.tracker.update(exported);
batch_count += 1;
if batch_count % 10 == 0 {
info!(
"Progress: {} documents exported ({} batches)",
exported, batch_count
);
}
}
None => {
debug!("No more documents available");
break;
}
}
}
debug!("Finalizing output file");
self.writer.finalize().await?;
self.query.close().await?;
let elapsed_ms = start_time.elapsed().as_millis() as u64;
self.tracker.finish();
let file_size_bytes = self.writer.file_size().await?;
info!(
"Export completed: {} documents, {} bytes, {} ms",
exported, file_size_bytes, elapsed_ms
);
Ok(ExportResult {
documents_exported: exported,
file_size_bytes,
elapsed_ms,
cancelled: false,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use mongodb::bson::{doc, Document};
struct MockStreamingQuery {
batches: Vec<Vec<Document>>,
current: usize,
}
impl MockStreamingQuery {
fn new(batches: Vec<Vec<Document>>) -> Self {
Self { batches, current: 0 }
}
}
#[async_trait]
impl StreamingQuery for MockStreamingQuery {
async fn next_batch(&mut self) -> Result<Option<Vec<Document>>> {
if self.current < self.batches.len() {
let batch = self.batches[self.current].clone();
self.current += 1;
Ok(Some(batch))
} else {
Ok(None)
}
}
async fn close(&mut self) -> Result<()> {
Ok(())
}
}
struct MockWriter {
written: Vec<Document>,
}
impl MockWriter {
fn new() -> Self {
Self {
written: Vec::new(),
}
}
}
#[async_trait]
impl FormatWriter for MockWriter {
async fn write_batch(&mut self, docs: &[Document]) -> Result<usize> {
self.written.extend_from_slice(docs);
Ok(docs.len())
}
async fn finalize(&mut self) -> Result<()> {
Ok(())
}
async fn file_size(&self) -> Result<u64> {
Ok(self.written.len() as u64 * 100) }
}
#[tokio::test]
async fn test_coordinator_basic() {
let batches = vec![
vec![doc! { "id": 1 }, doc! { "id": 2 }],
vec![doc! { "id": 3 }],
];
let query = Box::new(MockStreamingQuery::new(batches));
let tracker = ProgressTracker::new(Some(3), false);
let writer = Box::new(MockWriter::new());
let mut coordinator = ExportCoordinator::new(query, tracker, writer);
let result = coordinator.execute().await.unwrap();
assert_eq!(result.documents_exported, 3);
}
#[tokio::test]
async fn test_coordinator_empty_query() {
let batches: Vec<Vec<Document>> = vec![];
let query = Box::new(MockStreamingQuery::new(batches));
let tracker = ProgressTracker::new(None, false);
let writer = Box::new(MockWriter::new());
let mut coordinator = ExportCoordinator::new(query, tracker, writer);
let result = coordinator.execute().await.unwrap();
assert_eq!(result.documents_exported, 0);
}
}