use crate::core::error::{Error, Result};
use crate::dataframe::DataFrame;
use std::collections::HashMap;
use std::future::Future;
use std::path::Path;
use std::pin::Pin;
use std::sync::Arc;
pub trait FileFormat: Send + Sync {
fn format_name(&self) -> &'static str;
fn file_extensions(&self) -> Vec<&'static str>;
fn mime_types(&self) -> Vec<&'static str>;
fn can_handle_file(&self, path: &Path) -> bool;
fn can_handle_data(&self, data: &[u8]) -> bool;
fn read_from_path_with_options(
&self,
path: &Path,
options: &HashMap<String, String>,
) -> Result<DataFrame>;
fn read_from_bytes_with_options(
&self,
data: &[u8],
options: &HashMap<String, String>,
) -> Result<DataFrame>;
fn write_to_path_with_options(
&self,
df: &DataFrame,
path: &Path,
options: &HashMap<String, String>,
) -> Result<()>;
fn write_to_bytes_with_options(
&self,
df: &DataFrame,
options: &HashMap<String, String>,
) -> Result<Vec<u8>>;
fn get_metadata(&self, path: &Path) -> Result<HashMap<String, String>>;
fn validate_options(&self, options: &HashMap<String, String>) -> Result<()>;
fn default_options(&self) -> HashMap<String, String>;
fn capabilities(&self) -> FormatCapabilities;
fn read_from_path(&self, path: &Path) -> Result<DataFrame> {
self.read_from_path_with_options(path, &self.default_options())
}
fn write_to_path(&self, df: &DataFrame, path: &Path) -> Result<()> {
self.write_to_path_with_options(df, path, &self.default_options())
}
}
#[derive(Debug, Clone)]
pub struct FormatCapabilities {
pub can_read: bool,
pub can_write: bool,
pub supports_streaming: bool,
pub supports_schema_evolution: bool,
pub supports_compression: bool,
pub supports_encryption: bool,
pub supports_random_access: bool,
pub supports_append: bool,
pub max_file_size: Option<usize>,
pub supported_types: Vec<FormatDataType>,
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum FormatDataType {
Boolean,
Integer,
Float,
String,
DateTime,
Binary,
Nested,
Array,
Map,
}
pub trait StreamingOps: Send + Sync {
fn create_stream(
&self,
config: &HashMap<String, String>,
) -> Pin<Box<dyn Future<Output = Result<Box<dyn StreamHandle>>> + Send + '_>>;
fn list_streams(&self) -> Pin<Box<dyn Future<Output = Result<Vec<String>>> + Send + '_>>;
fn stream_exists(
&self,
stream_name: &str,
) -> Pin<Box<dyn Future<Output = Result<bool>> + Send + '_>>;
fn capabilities(&self) -> StreamingCapabilities;
}
pub trait StreamHandle: Send + Sync {
fn read_batch(
&self,
batch_size: usize,
) -> Pin<Box<dyn Future<Output = Result<Option<DataFrame>>> + Send + '_>>;
fn write_batch(&self, df: &DataFrame) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
fn close(&self) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
fn metadata(&self) -> HashMap<String, String>;
}
#[derive(Debug, Clone)]
pub struct StreamingCapabilities {
pub supports_realtime: bool,
pub supports_batch: bool,
pub supports_windowing: bool,
pub supports_exactly_once: bool,
pub supports_schema_evolution: bool,
pub max_throughput: Option<u64>,
pub max_latency: Option<u64>,
pub serialization_formats: Vec<SerializationFormat>,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum SerializationFormat {
Json,
Avro,
Protobuf,
MessagePack,
Parquet,
Arrow,
Custom,
}
pub struct FormatRegistry {
formats: HashMap<String, Arc<dyn FileFormat>>,
streaming_providers: HashMap<String, Arc<dyn StreamingOps>>,
}
impl FormatRegistry {
pub fn new() -> Self {
Self {
formats: HashMap::new(),
streaming_providers: HashMap::new(),
}
}
pub fn register_format<F: FileFormat + 'static>(&mut self, format: F) {
let name = format.format_name().to_string();
self.formats.insert(name, Arc::new(format));
}
pub fn register_streaming_provider<S: StreamingOps + 'static>(
&mut self,
name: String,
provider: S,
) {
self.streaming_providers.insert(name, Arc::new(provider));
}
pub fn get_format(&self, name: &str) -> Option<Arc<dyn FileFormat>> {
self.formats.get(name).cloned()
}
pub fn get_streaming_provider(&self, name: &str) -> Option<Arc<dyn StreamingOps>> {
self.streaming_providers.get(name).cloned()
}
pub fn detect_format(&self, path: &Path) -> Option<Arc<dyn FileFormat>> {
for format in self.formats.values() {
if format.can_handle_file(path) {
return Some(Arc::clone(format));
}
}
None
}
pub fn detect_format_from_data(&self, data: &[u8]) -> Option<Arc<dyn FileFormat>> {
for format in self.formats.values() {
if format.can_handle_data(data) {
return Some(Arc::clone(format));
}
}
None
}
pub fn list_formats(&self) -> Vec<String> {
self.formats.keys().cloned().collect()
}
pub fn list_streaming_providers(&self) -> Vec<String> {
self.streaming_providers.keys().cloned().collect()
}
pub fn get_all_capabilities(&self) -> HashMap<String, FormatCapabilities> {
self.formats
.iter()
.map(|(name, format)| (name.clone(), format.capabilities()))
.collect()
}
}
impl Default for FormatRegistry {
fn default() -> Self {
Self::new()
}
}
pub trait DataOperations {
fn read_data(
&self,
source: &DataSource,
) -> Pin<Box<dyn Future<Output = Result<DataFrame>> + Send + '_>>;
fn write_data(
&self,
df: &DataFrame,
destination: &DataDestination,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
fn copy_data(
&self,
source: &DataSource,
destination: &DataDestination,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
fn transform_data(
&self,
source: &DataSource,
pipeline: &TransformPipeline,
destination: &DataDestination,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + '_>>;
}
#[derive(Debug, Clone)]
pub enum DataSource {
File {
path: String,
format: Option<String>,
options: HashMap<String, String>,
},
Stream {
provider: String,
stream_name: String,
options: HashMap<String, String>,
},
Url {
url: String,
format: Option<String>,
options: HashMap<String, String>,
},
Memory {
data: Vec<u8>,
format: String,
},
}
#[derive(Debug, Clone)]
pub enum DataDestination {
File {
path: String,
format: Option<String>,
options: HashMap<String, String>,
},
Stream {
provider: String,
stream_name: String,
options: HashMap<String, String>,
},
Memory {
format: String,
options: HashMap<String, String>,
},
}
#[derive(Debug, Clone)]
pub struct TransformPipeline {
pub stages: Vec<TransformStage>,
pub options: HashMap<String, String>,
}
#[derive(Debug, Clone)]
pub enum TransformStage {
Filter {
condition: String,
},
Map {
expression: String,
},
Aggregate {
group_by: Vec<String>,
aggregations: HashMap<String, String>,
},
Join {
other_source: DataSource,
on: Vec<String>,
join_type: JoinType,
},
Window {
partition_by: Vec<String>,
order_by: Vec<String>,
window_spec: String,
},
Custom {
name: String,
parameters: HashMap<String, String>,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JoinType {
Inner,
Left,
Right,
Full,
Cross,
LeftSemi,
LeftAnti,
}
#[cfg(test)]
mod tests {
use super::*;
struct MockFormat;
impl FileFormat for MockFormat {
fn format_name(&self) -> &'static str {
"mock"
}
fn file_extensions(&self) -> Vec<&'static str> {
vec!["mock", "test"]
}
fn mime_types(&self) -> Vec<&'static str> {
vec!["application/mock"]
}
fn can_handle_file(&self, path: &Path) -> bool {
path.extension()
.and_then(|ext| ext.to_str())
.map(|ext| self.file_extensions().contains(&ext))
.unwrap_or(false)
}
fn can_handle_data(&self, data: &[u8]) -> bool {
data.starts_with(b"MOCK")
}
fn read_from_path_with_options(
&self,
_path: &Path,
_options: &HashMap<String, String>,
) -> Result<DataFrame> {
Err(Error::InvalidOperation(
"Mock format read not implemented".to_string(),
))
}
fn read_from_bytes_with_options(
&self,
_data: &[u8],
_options: &HashMap<String, String>,
) -> Result<DataFrame> {
Err(Error::InvalidOperation(
"Mock format read not implemented".to_string(),
))
}
fn write_to_path_with_options(
&self,
_df: &DataFrame,
_path: &Path,
_options: &HashMap<String, String>,
) -> Result<()> {
Err(Error::InvalidOperation(
"Mock format write not implemented".to_string(),
))
}
fn write_to_bytes_with_options(
&self,
_df: &DataFrame,
_options: &HashMap<String, String>,
) -> Result<Vec<u8>> {
Err(Error::InvalidOperation(
"Mock format write not implemented".to_string(),
))
}
fn get_metadata(&self, _path: &Path) -> Result<HashMap<String, String>> {
Ok(HashMap::new())
}
fn validate_options(&self, _options: &HashMap<String, String>) -> Result<()> {
Ok(())
}
fn default_options(&self) -> HashMap<String, String> {
HashMap::new()
}
fn capabilities(&self) -> FormatCapabilities {
FormatCapabilities {
can_read: true,
can_write: true,
supports_streaming: false,
supports_schema_evolution: false,
supports_compression: false,
supports_encryption: false,
supports_random_access: false,
supports_append: false,
max_file_size: None,
supported_types: vec![
FormatDataType::Boolean,
FormatDataType::Integer,
FormatDataType::Float,
FormatDataType::String,
],
}
}
}
#[test]
fn test_format_registry() {
let mut registry = FormatRegistry::new();
registry.register_format(MockFormat);
assert!(registry.get_format("mock").is_some());
assert!(registry.get_format("nonexistent").is_none());
let formats = registry.list_formats();
assert!(formats.contains(&"mock".to_string()));
let capabilities = registry.get_all_capabilities();
assert!(capabilities.contains_key("mock"));
}
#[test]
fn test_format_detection() {
let mut registry = FormatRegistry::new();
registry.register_format(MockFormat);
let path = Path::new("test.mock");
let detected = registry.detect_format(path);
assert!(detected.is_some());
assert_eq!(
detected.expect("operation should succeed").format_name(),
"mock"
);
let data = b"MOCK format data";
let detected = registry.detect_format_from_data(data);
assert!(detected.is_some());
assert_eq!(
detected.expect("operation should succeed").format_name(),
"mock"
);
let invalid_data = b"Not mock data";
let detected = registry.detect_format_from_data(invalid_data);
assert!(detected.is_none());
}
#[test]
fn test_data_source() {
let source = DataSource::File {
path: "test.csv".to_string(),
format: Some("csv".to_string()),
options: HashMap::new(),
};
match source {
DataSource::File { path, format, .. } => {
assert_eq!(path, "test.csv");
assert_eq!(format, Some("csv".to_string()));
}
_ => panic!("Expected File source"),
}
}
#[test]
fn test_transform_pipeline() {
let pipeline = TransformPipeline {
stages: vec![
TransformStage::Filter {
condition: "age > 18".to_string(),
},
TransformStage::Aggregate {
group_by: vec!["department".to_string()],
aggregations: {
let mut agg = HashMap::new();
agg.insert("salary".to_string(), "avg".to_string());
agg.insert("count".to_string(), "count".to_string());
agg
},
},
],
options: HashMap::new(),
};
assert_eq!(pipeline.stages.len(), 2);
match &pipeline.stages[0] {
TransformStage::Filter { condition } => {
assert_eq!(condition, "age > 18");
}
_ => panic!("Expected Filter stage"),
}
match &pipeline.stages[1] {
TransformStage::Aggregate {
group_by,
aggregations,
} => {
assert_eq!(group_by, &vec!["department".to_string()]);
assert_eq!(aggregations.len(), 2);
assert_eq!(aggregations.get("salary"), Some(&"avg".to_string()));
}
_ => panic!("Expected Aggregate stage"),
}
}
}