use arrow::array::RecordBatch;
use arrow::datatypes::SchemaRef;
use super::error::{codes, ApiError};
use crate::UntypedSourceHandle;
pub struct Writer {
handle: UntypedSourceHandle,
closed: bool,
}
impl Writer {
pub(crate) fn new(handle: UntypedSourceHandle) -> Self {
Self {
handle,
closed: false,
}
}
pub fn write(&mut self, batch: RecordBatch) -> Result<(), ApiError> {
if self.closed {
return Err(ApiError::Ingestion {
code: codes::WRITER_CLOSED,
message: "Writer is closed".into(),
});
}
let expected = self.handle.schema();
let actual = batch.schema();
if expected.fields().len() != actual.fields().len() {
return Err(ApiError::Ingestion {
code: codes::BATCH_SCHEMA_MISMATCH,
message: format!(
"Schema mismatch: expected {} columns, got {}",
expected.fields().len(),
actual.fields().len()
),
});
}
self.handle
.push_arrow(batch)
.map_err(|e| ApiError::ingestion(e.to_string()))
}
pub fn flush(&mut self) -> Result<(), ApiError> {
if self.closed {
return Err(ApiError::Ingestion {
code: codes::WRITER_CLOSED,
message: "Writer is closed".into(),
});
}
Ok(())
}
#[allow(clippy::unnecessary_wraps)]
pub fn close(mut self) -> Result<(), ApiError> {
self.closed = true;
Ok(())
}
#[must_use]
pub fn schema(&self) -> SchemaRef {
self.handle.schema().clone()
}
#[must_use]
pub fn name(&self) -> &str {
self.handle.name()
}
pub fn watermark(&self, timestamp: i64) {
self.handle.watermark(timestamp);
}
#[must_use]
pub fn current_watermark(&self) -> i64 {
self.handle.current_watermark()
}
}
unsafe impl Send for Writer {}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_writer_send() {
fn assert_send<T: Send>() {}
assert_send::<Writer>();
}
}