use crate::error::{Result, SammError};
use crate::metamodel::{Aspect, Operation, Property};
use crate::parser::SammTurtleParser;
use async_stream::stream;
use futures::stream::Stream;
use serde::{Deserialize, Serialize};
use std::path::{Path, PathBuf};
use std::pin::Pin;
use tokio::fs::File;
use tokio::io::{AsyncBufReadExt, AsyncReadExt, AsyncSeekExt, BufReader};
#[derive(Debug, Clone)]
pub enum ParseEvent {
Started {
total_bytes: u64,
},
Progress {
bytes_parsed: u64,
total_bytes: u64,
},
PropertyParsed {
property: Property,
},
OperationParsed {
operation: Operation,
},
PrefixParsed {
prefix: String,
namespace: String,
},
MetadataParsed {
language: String,
metadata_type: String,
},
Completed {
aspect: Aspect,
},
Error {
error: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ParseState {
pub file_path: PathBuf,
pub byte_offset: u64,
pub partial_aspect: Option<Aspect>,
pub properties_parsed: usize,
pub operations_parsed: usize,
pub last_saved: Option<String>,
}
impl ParseState {
pub fn new(file_path: impl Into<PathBuf>) -> Self {
Self {
file_path: file_path.into(),
byte_offset: 0,
partial_aspect: None,
properties_parsed: 0,
operations_parsed: 0,
last_saved: None,
}
}
pub async fn progress_percentage(&self) -> Result<f64> {
let metadata = tokio::fs::metadata(&self.file_path).await?;
let total_bytes = metadata.len();
if total_bytes == 0 {
return Ok(100.0);
}
Ok((self.byte_offset as f64 / total_bytes as f64) * 100.0)
}
pub async fn save_to_file(&mut self, path: impl AsRef<Path>) -> Result<()> {
use tokio::io::AsyncWriteExt;
self.last_saved = Some(chrono::Utc::now().to_rfc3339());
let json = serde_json::to_string_pretty(self)
.map_err(|e| SammError::Other(format!("JSON error: {}", e)))?;
let mut file = File::create(path).await?;
file.write_all(json.as_bytes()).await?;
file.flush().await?;
Ok(())
}
pub async fn load_from_file(path: impl AsRef<Path>) -> Result<Self> {
let contents = tokio::fs::read_to_string(path).await?;
let state: ParseState = serde_json::from_str(&contents)
.map_err(|e| SammError::Other(format!("JSON error: {}", e)))?;
Ok(state)
}
}
pub struct IncrementalParser {
file_path: PathBuf,
chunk_size: usize,
state: ParseState,
}
impl IncrementalParser {
pub fn new(file_path: impl Into<PathBuf>) -> Self {
let file_path = file_path.into();
let state = ParseState::new(file_path.clone());
Self {
file_path,
chunk_size: 64 * 1024, state,
}
}
pub fn from_state(state: ParseState) -> Self {
Self {
file_path: state.file_path.clone(),
chunk_size: 64 * 1024,
state,
}
}
pub fn with_chunk_size(mut self, size: usize) -> Self {
self.chunk_size = size.max(1024);
self
}
pub fn state(&self) -> &ParseState {
&self.state
}
pub fn state_mut(&mut self) -> &mut ParseState {
&mut self.state
}
pub async fn save_state(&mut self, path: impl AsRef<Path>) -> Result<()> {
self.state.save_to_file(path).await
}
pub async fn parse_with_events(
&mut self,
) -> Result<Pin<Box<dyn Stream<Item = ParseEvent> + Send>>> {
let metadata = tokio::fs::metadata(&self.file_path).await?;
let total_bytes = metadata.len();
let file = File::open(&self.file_path).await?;
let mut reader = BufReader::new(file);
if self.state.byte_offset > 0 {
reader
.seek(tokio::io::SeekFrom::Start(self.state.byte_offset))
.await?;
}
let chunk_size = self.chunk_size;
let mut current_offset = self.state.byte_offset;
let partial_aspect = self.state.partial_aspect.clone();
let stream = stream! {
yield ParseEvent::Started { total_bytes };
let mut buffer = Vec::new();
let mut accumulated = String::new();
let mut aspect_result: Option<Aspect> = partial_aspect;
loop {
buffer.clear();
buffer.resize(chunk_size, 0);
match reader.read(&mut buffer).await {
Ok(0) => break, Ok(n) => {
current_offset += n as u64;
if let Ok(chunk_str) = String::from_utf8(buffer[..n].to_vec()) {
accumulated.push_str(&chunk_str);
if accumulated.contains('.') || accumulated.contains(';') {
match SammTurtleParser::new().parse_string(&accumulated, "urn:samm:").await {
Ok(aspect) => {
if let Some(ref asp) = aspect_result {
for prop in &aspect.properties {
if !asp.properties.iter().any(|p| p.metadata.urn == prop.metadata.urn) {
yield ParseEvent::PropertyParsed {
property: prop.clone(),
};
}
}
for op in &aspect.operations {
if !asp.operations.iter().any(|o| o.metadata.urn == op.metadata.urn) {
yield ParseEvent::OperationParsed {
operation: op.clone(),
};
}
}
} else {
for prop in &aspect.properties {
yield ParseEvent::PropertyParsed {
property: prop.clone(),
};
}
for op in &aspect.operations {
yield ParseEvent::OperationParsed {
operation: op.clone(),
};
}
}
aspect_result = Some(aspect);
accumulated.clear();
}
Err(_) => {
}
}
}
yield ParseEvent::Progress {
bytes_parsed: current_offset,
total_bytes,
};
}
}
Err(e) => {
yield ParseEvent::Error {
error: format!("IO error: {}", e),
};
break;
}
}
}
if !accumulated.is_empty() && aspect_result.is_none() {
match SammTurtleParser::new().parse_string(&accumulated, "urn:samm:").await {
Ok(aspect) => {
aspect_result = Some(aspect);
}
Err(e) => {
yield ParseEvent::Error { error: e.to_string() };
}
}
}
if let Some(aspect) = aspect_result {
yield ParseEvent::Completed { aspect };
} else {
yield ParseEvent::Error {
error: "Failed to parse aspect".to_string(),
};
}
};
Ok(Box::pin(stream))
}
pub async fn parse(&mut self) -> Result<Aspect> {
use futures::StreamExt;
let mut events = self.parse_with_events().await?;
let mut result = None;
while let Some(event) = events.next().await {
if let ParseEvent::Completed { aspect } = event {
result = Some(aspect);
break;
} else if let ParseEvent::Error { error } = event {
return Err(SammError::ParseError(error));
}
}
result.ok_or_else(|| SammError::ParseError("No aspect parsed".to_string()))
}
pub async fn parse_with_progress<F>(&mut self, mut callback: F) -> Result<Aspect>
where
F: FnMut(u64, u64) -> bool + Send,
{
use futures::StreamExt;
let mut events = self.parse_with_events().await?;
let mut result = None;
while let Some(event) = events.next().await {
match event {
ParseEvent::Progress {
bytes_parsed,
total_bytes,
} => {
if !callback(bytes_parsed, total_bytes) {
return Err(SammError::Other("Parsing cancelled by user".to_string()));
}
}
ParseEvent::Completed { aspect } => {
result = Some(aspect);
break;
}
ParseEvent::Error { error } => {
return Err(SammError::ParseError(error));
}
_ => {}
}
}
result.ok_or_else(|| SammError::ParseError("No aspect parsed".to_string()))
}
}
impl Default for IncrementalParser {
fn default() -> Self {
Self::new("")
}
}
#[cfg(test)]
mod tests {
use super::*;
use futures::StreamExt;
use std::io::Write;
use tempfile::NamedTempFile;
#[tokio::test]
async fn test_parse_state_creation() {
let state = ParseState::new("/tmp/test.ttl");
assert_eq!(state.byte_offset, 0);
assert_eq!(state.properties_parsed, 0);
assert!(state.partial_aspect.is_none());
}
#[tokio::test]
async fn test_parse_state_progress() {
let mut temp_file = NamedTempFile::new().expect("temp file creation should succeed");
write!(temp_file, "test content").expect("write should succeed");
temp_file.flush().expect("flush should succeed");
let mut state = ParseState::new(temp_file.path());
state.byte_offset = 6;
let progress = state
.progress_percentage()
.await
.expect("async operation should succeed");
assert!((progress - 50.0).abs() < 0.1);
}
#[tokio::test]
async fn test_parse_state_save_load() {
let temp_state_file = NamedTempFile::new().expect("temp file creation should succeed");
let temp_data_file = NamedTempFile::new().expect("temp file creation should succeed");
let mut state = ParseState::new(temp_data_file.path());
state.byte_offset = 100;
state.properties_parsed = 5;
state
.save_to_file(temp_state_file.path())
.await
.expect("async operation should succeed");
let loaded_state = ParseState::load_from_file(temp_state_file.path())
.await
.expect("operation should succeed");
assert_eq!(loaded_state.byte_offset, 100);
assert_eq!(loaded_state.properties_parsed, 5);
}
#[tokio::test]
async fn test_incremental_parser_creation() {
let parser = IncrementalParser::new("/tmp/test.ttl");
assert_eq!(parser.state().byte_offset, 0);
}
#[tokio::test]
async fn test_incremental_parser_with_chunk_size() {
let parser = IncrementalParser::new("/tmp/test.ttl").with_chunk_size(1024);
assert_eq!(parser.chunk_size, 1024);
}
#[tokio::test]
async fn test_parse_events_simple() {
let mut temp_file = NamedTempFile::new().expect("temp file creation should succeed");
let ttl_content = r#"
@prefix samm: <urn:samm:org.eclipse.esmf.samm:meta-model:2.1.0#> .
@prefix : <urn:samm:org.example:1.0.0#> .
:TestAspect a samm:Aspect ;
samm:properties ( :property1 ) ;
samm:operations ( ) .
:property1 a samm:Property ;
samm:characteristic :TestCharacteristic .
:TestCharacteristic a samm:Characteristic ;
samm:dataType xsd:string .
"#;
write!(temp_file, "{}", ttl_content).expect("write should succeed");
temp_file.flush().expect("flush should succeed");
let mut parser = IncrementalParser::new(temp_file.path());
let events = parser
.parse_with_events()
.await
.expect("async operation should succeed");
let collected: Vec<ParseEvent> = events.collect().await;
assert!(collected
.iter()
.any(|e| matches!(e, ParseEvent::Started { .. })));
assert!(!collected.is_empty(), "Should emit at least some events");
let has_completion = collected
.iter()
.any(|e| matches!(e, ParseEvent::Completed { .. } | ParseEvent::Error { .. }));
assert!(
has_completion,
"Should have either Completed or Error event"
);
}
}