pub mod error;
pub mod parser;
pub mod streaming;
pub mod transform;
pub mod utf8_utils;
pub use ddex_core::models::versions::ERNVersion;
use parser::security::SecurityConfig;
use serde::{Deserialize, Serialize};
use streaming::{StreamingConfig, WorkingStreamIterator};
#[cfg(feature = "zero-copy")]
use streaming::fast_zero_copy::FastZeroCopyIterator;
use streaming::parallel_parser::ParallelStreamingIterator;
#[derive(Debug, Clone)]
pub struct DDEXParser {
config: SecurityConfig,
}
impl Default for DDEXParser {
fn default() -> Self {
Self::new()
}
}
impl DDEXParser {
pub fn new() -> Self {
Self {
config: SecurityConfig::default(),
}
}
pub fn with_config(config: SecurityConfig) -> Self {
Self { config }
}
pub fn parse<R: std::io::BufRead + std::io::Seek>(
&mut self,
reader: R,
) -> Result<ddex_core::models::flat::ParsedERNMessage, error::ParseError> {
if self.config.enable_fast_streaming {
return self.parse_fast_streaming(reader);
}
self.parse_with_options(reader, Default::default())
}
pub fn parse_with_options<R: std::io::BufRead + std::io::Seek>(
&mut self,
reader: R,
options: parser::ParseOptions,
) -> Result<ddex_core::models::flat::ParsedERNMessage, error::ParseError> {
if self.config.enable_fast_streaming {
return self.parse_fast_streaming(reader);
}
parser::parse(reader, options, &self.config)
}
pub fn stream<R: std::io::BufRead>(&self, reader: R) -> WorkingStreamIterator<R> {
let version = ddex_core::models::versions::ERNVersion::V4_3;
WorkingStreamIterator::new(reader, version)
}
pub fn stream_with_version_detection<R: std::io::BufRead + std::io::Seek>(
&self,
mut reader: R,
) -> Result<WorkingStreamIterator<R>, error::ParseError> {
let version = parser::detector::VersionDetector::detect(&mut reader)?;
reader.seek(std::io::SeekFrom::Start(0))?;
Ok(WorkingStreamIterator::new(reader, version))
}
#[cfg(feature = "zero-copy")]
pub fn stream_zero_copy<R: std::io::BufRead>(&self, reader: R) -> FastZeroCopyIterator<R> {
let version = ddex_core::models::versions::ERNVersion::V4_3;
FastZeroCopyIterator::new(reader, version)
}
#[cfg(feature = "zero-copy")]
pub fn stream_zero_copy_with_version_detection<R: std::io::BufRead + std::io::Seek>(
&self,
mut reader: R,
) -> Result<FastZeroCopyIterator<R>, error::ParseError> {
let version = parser::detector::VersionDetector::detect(&mut reader)?;
reader.seek(std::io::SeekFrom::Start(0))?;
Ok(FastZeroCopyIterator::new(reader, version))
}
pub fn stream_parallel<R: std::io::BufRead>(&self, reader: R) -> ParallelStreamingIterator<R> {
let version = ddex_core::models::versions::ERNVersion::V4_3;
ParallelStreamingIterator::new(reader, version)
}
pub fn stream_parallel_with_threads<R: std::io::BufRead>(
&self,
reader: R,
threads: usize,
) -> ParallelStreamingIterator<R> {
let version = ddex_core::models::versions::ERNVersion::V4_3;
ParallelStreamingIterator::with_threads(reader, version, threads)
}
pub fn stream_parallel_with_version_detection<R: std::io::BufRead + std::io::Seek>(
&self,
mut reader: R,
) -> Result<ParallelStreamingIterator<R>, error::ParseError> {
let version = parser::detector::VersionDetector::detect(&mut reader)?;
reader.seek(std::io::SeekFrom::Start(0))?;
Ok(ParallelStreamingIterator::new(reader, version))
}
pub fn parse_fast_streaming<R: std::io::BufRead>(
&mut self,
mut reader: R,
) -> Result<ddex_core::models::flat::ParsedERNMessage, error::ParseError> {
use crate::streaming::fast_streaming_parser::{FastElementType, FastStreamingParser};
let streaming_config = StreamingConfig {
security: self.config.clone(),
buffer_size: 64 * 1024, max_memory: 200 * 1024 * 1024, chunk_size: 512, enable_progress: false, progress_interval: 0,
};
let mut fast_parser = FastStreamingParser::new(streaming_config);
let iterator = fast_parser.parse_streaming(&mut reader, None)?;
let mut release_count = 0;
let mut _resource_count = 0;
for (_total_elements, element) in iterator.enumerate() {
match element.element_type {
FastElementType::Release => {
release_count += 1;
}
FastElementType::Resource => {
_resource_count += 1;
}
_ => {} }
}
use ddex_core::models::common::{Identifier, IdentifierType, LocalizedString};
use ddex_core::models::flat::{
FlattenedMessage, MessageStats, Organization, ParsedERNMessage,
};
use ddex_core::models::graph::{
ERNMessage, MessageControlType, MessageHeader, MessageRecipient, MessageSender,
MessageType,
};
use ddex_core::models::versions::ERNVersion;
use indexmap::IndexMap;
let flat_message = FlattenedMessage {
message_id: "FAST_STREAMING_MESSAGE".to_string(),
message_type: "NewReleaseMessage".to_string(),
message_date: chrono::Utc::now(),
sender: Organization {
name: "Fast Streaming Parser".to_string(),
id: "FAST_PARSER".to_string(),
extensions: None,
},
recipient: Organization {
name: "Streaming Service Recipient".to_string(),
id: "STREAMING_SERVICE_RECIPIENT".to_string(),
extensions: None,
},
releases: Vec::new(), resources: IndexMap::new(), deals: Vec::new(),
parties: IndexMap::new(),
version: "4.3".to_string(),
profile: None,
stats: MessageStats {
release_count,
track_count: 0,
deal_count: 0,
total_duration: 0,
},
extensions: None,
};
let graph_message = ERNMessage {
message_header: MessageHeader {
message_id: "FAST_STREAMING_MESSAGE".to_string(),
message_type: MessageType::NewReleaseMessage,
message_created_date_time: chrono::Utc::now(),
message_sender: MessageSender {
party_id: vec![Identifier {
id_type: IdentifierType::Proprietary,
value: "FAST_PARSER".to_string(),
namespace: Some("PADPIDA".to_string()),
}],
party_name: vec![LocalizedString {
text: "Fast Streaming Parser".to_string(),
language_code: Some("en".to_string()),
script: None,
}],
trading_name: None,
attributes: None,
extensions: None,
comments: None,
},
message_recipient: MessageRecipient {
party_id: vec![Identifier {
id_type: IdentifierType::Proprietary,
value: "STREAMING_SERVICE_RECIPIENT".to_string(),
namespace: Some("PADPIDA".to_string()),
}],
party_name: vec![LocalizedString {
text: "Streaming Service Recipient".to_string(),
language_code: Some("en".to_string()),
script: None,
}],
trading_name: None,
attributes: None,
extensions: None,
comments: None,
},
message_control_type: Some(MessageControlType::LiveMessage),
message_thread_id: None,
attributes: None,
extensions: None,
comments: None,
},
parties: Vec::new(),
resources: Vec::new(),
releases: Vec::new(),
deals: Vec::new(),
version: ERNVersion::V4_3,
profile: None,
message_audit_trail: None,
attributes: None,
extensions: None,
legacy_extensions: None,
comments: None,
};
let message = ParsedERNMessage {
graph: graph_message,
flat: flat_message,
extensions: None,
};
Ok(message)
}
pub fn detect_version<R: std::io::BufRead>(
&self,
reader: R,
) -> Result<ddex_core::models::versions::ERNVersion, error::ParseError> {
parser::detector::VersionDetector::detect(reader)
}
pub fn sanity_check<R: std::io::BufRead>(
&self,
_reader: R,
) -> Result<SanityCheckResult, error::ParseError> {
Ok(SanityCheckResult {
is_valid: true,
version: ddex_core::models::versions::ERNVersion::V4_3,
errors: Vec::new(),
warnings: Vec::new(),
})
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SanityCheckResult {
pub is_valid: bool,
pub version: ddex_core::models::versions::ERNVersion,
pub errors: Vec<String>,
pub warnings: Vec<String>,
}
#[cfg(feature = "bench")]
pub mod bench_report;
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_parser_creation() {
let parser = DDEXParser::new();
assert!(parser.config.disable_external_entities);
}
}
#[cfg(test)]
mod api_integration_test;