1pub mod error;
4pub mod parser;
5pub mod streaming;
6pub mod transform;
7pub mod utf8_utils;
8
9pub use ddex_core::models::versions::ERNVersion;
11
12use parser::security::SecurityConfig;
13use serde::{Deserialize, Serialize};
14use streaming::{StreamingConfig, WorkingStreamIterator};
15
16#[cfg(feature = "zero-copy")]
17use streaming::fast_zero_copy::FastZeroCopyIterator;
18
19use streaming::parallel_parser::ParallelStreamingIterator;
20
21#[derive(Debug, Clone)]
23pub struct DDEXParser {
24 config: SecurityConfig,
25}
26
27impl Default for DDEXParser {
28 fn default() -> Self {
29 Self::new()
30 }
31}
32
33impl DDEXParser {
34 pub fn new() -> Self {
36 Self {
37 config: SecurityConfig::default(),
38 }
39 }
40
41 pub fn with_config(config: SecurityConfig) -> Self {
43 Self { config }
44 }
45
46 pub fn parse<R: std::io::BufRead + std::io::Seek>(
48 &mut self,
49 reader: R,
50 ) -> Result<ddex_core::models::flat::ParsedERNMessage, error::ParseError> {
51 if self.config.enable_fast_streaming {
53 return self.parse_fast_streaming(reader);
54 }
55
56 self.parse_with_options(reader, Default::default())
58 }
59
60 pub fn parse_with_options<R: std::io::BufRead + std::io::Seek>(
62 &mut self,
63 reader: R,
64 options: parser::ParseOptions,
65 ) -> Result<ddex_core::models::flat::ParsedERNMessage, error::ParseError> {
66 if self.config.enable_fast_streaming {
68 return self.parse_fast_streaming(reader);
69 }
70
71 parser::parse(reader, options, &self.config)
75 }
76
77 pub fn stream<R: std::io::BufRead>(&self, reader: R) -> WorkingStreamIterator<R> {
79 let version = ddex_core::models::versions::ERNVersion::V4_3;
82
83 WorkingStreamIterator::new(reader, version)
84 }
85
86 pub fn stream_with_version_detection<R: std::io::BufRead + std::io::Seek>(
88 &self,
89 mut reader: R,
90 ) -> Result<WorkingStreamIterator<R>, error::ParseError> {
91 let version = parser::detector::VersionDetector::detect(&mut reader)?;
93 reader.seek(std::io::SeekFrom::Start(0))?;
94
95 Ok(WorkingStreamIterator::new(reader, version))
96 }
97
98 #[cfg(feature = "zero-copy")]
100 pub fn stream_zero_copy<R: std::io::BufRead>(&self, reader: R) -> FastZeroCopyIterator<R> {
101 let version = ddex_core::models::versions::ERNVersion::V4_3;
102 FastZeroCopyIterator::new(reader, version)
103 }
104
105 #[cfg(feature = "zero-copy")]
107 pub fn stream_zero_copy_with_version_detection<R: std::io::BufRead + std::io::Seek>(
108 &self,
109 mut reader: R,
110 ) -> Result<FastZeroCopyIterator<R>, error::ParseError> {
111 let version = parser::detector::VersionDetector::detect(&mut reader)?;
112 reader.seek(std::io::SeekFrom::Start(0))?;
113
114 Ok(FastZeroCopyIterator::new(reader, version))
115 }
116
117 pub fn stream_parallel<R: std::io::BufRead>(&self, reader: R) -> ParallelStreamingIterator<R> {
119 let version = ddex_core::models::versions::ERNVersion::V4_3;
120 ParallelStreamingIterator::new(reader, version)
121 }
122
123 pub fn stream_parallel_with_threads<R: std::io::BufRead>(
125 &self,
126 reader: R,
127 threads: usize,
128 ) -> ParallelStreamingIterator<R> {
129 let version = ddex_core::models::versions::ERNVersion::V4_3;
130 ParallelStreamingIterator::with_threads(reader, version, threads)
131 }
132
133 pub fn stream_parallel_with_version_detection<R: std::io::BufRead + std::io::Seek>(
135 &self,
136 mut reader: R,
137 ) -> Result<ParallelStreamingIterator<R>, error::ParseError> {
138 let version = parser::detector::VersionDetector::detect(&mut reader)?;
139 reader.seek(std::io::SeekFrom::Start(0))?;
140
141 Ok(ParallelStreamingIterator::new(reader, version))
142 }
143
144 pub fn parse_fast_streaming<R: std::io::BufRead>(
146 &mut self,
147 mut reader: R,
148 ) -> Result<ddex_core::models::flat::ParsedERNMessage, error::ParseError> {
149 use crate::streaming::fast_streaming_parser::{FastElementType, FastStreamingParser};
150
151 let streaming_config = StreamingConfig {
153 security: self.config.clone(),
154 buffer_size: 64 * 1024, max_memory: 200 * 1024 * 1024, chunk_size: 512, enable_progress: false, progress_interval: 0,
159 };
160
161 let mut fast_parser = FastStreamingParser::new(streaming_config);
163
164 let iterator = fast_parser.parse_streaming(&mut reader, None)?;
166
167 let mut release_count = 0;
169 let mut _resource_count = 0;
170
171 for (_total_elements, element) in iterator.enumerate() {
172 match element.element_type {
173 FastElementType::Release => {
174 release_count += 1;
175 }
176 FastElementType::Resource => {
177 _resource_count += 1;
178 }
179 _ => {} }
181 }
182
183 use ddex_core::models::common::{Identifier, IdentifierType, LocalizedString};
185 use ddex_core::models::flat::{
186 FlattenedMessage, MessageStats, Organization, ParsedERNMessage,
187 };
188 use ddex_core::models::graph::{
189 ERNMessage, MessageControlType, MessageHeader, MessageRecipient, MessageSender,
190 MessageType,
191 };
192 use ddex_core::models::versions::ERNVersion;
193 use indexmap::IndexMap;
194
195 let flat_message = FlattenedMessage {
197 message_id: "FAST_STREAMING_MESSAGE".to_string(),
198 message_type: "NewReleaseMessage".to_string(),
199 message_date: chrono::Utc::now(),
200 sender: Organization {
201 name: "Fast Streaming Parser".to_string(),
202 id: "FAST_PARSER".to_string(),
203 extensions: None,
204 },
205 recipient: Organization {
206 name: "Test Recipient".to_string(),
207 id: "TEST_RECIPIENT".to_string(),
208 extensions: None,
209 },
210 releases: Vec::new(), resources: IndexMap::new(), deals: Vec::new(),
213 parties: IndexMap::new(),
214 version: "4.3".to_string(),
215 profile: None,
216 stats: MessageStats {
217 release_count,
218 track_count: 0,
219 deal_count: 0,
220 total_duration: 0,
221 },
222 extensions: None,
223 };
224
225 let graph_message = ERNMessage {
227 message_header: MessageHeader {
228 message_id: "FAST_STREAMING_MESSAGE".to_string(),
229 message_type: MessageType::NewReleaseMessage,
230 message_created_date_time: chrono::Utc::now(),
231 message_sender: MessageSender {
232 party_id: vec![Identifier {
233 id_type: IdentifierType::Proprietary,
234 value: "FAST_PARSER".to_string(),
235 namespace: Some("PADPIDA".to_string()),
236 }],
237 party_name: vec![LocalizedString {
238 text: "Fast Streaming Parser".to_string(),
239 language_code: Some("en".to_string()),
240 script: None,
241 }],
242 trading_name: None,
243 attributes: None,
244 extensions: None,
245 comments: None,
246 },
247 message_recipient: MessageRecipient {
248 party_id: vec![Identifier {
249 id_type: IdentifierType::Proprietary,
250 value: "TEST_RECIPIENT".to_string(),
251 namespace: Some("PADPIDA".to_string()),
252 }],
253 party_name: vec![LocalizedString {
254 text: "Test Recipient".to_string(),
255 language_code: Some("en".to_string()),
256 script: None,
257 }],
258 trading_name: None,
259 attributes: None,
260 extensions: None,
261 comments: None,
262 },
263 message_control_type: Some(MessageControlType::LiveMessage),
264 message_thread_id: None,
265 attributes: None,
266 extensions: None,
267 comments: None,
268 },
269 parties: Vec::new(),
270 resources: Vec::new(),
271 releases: Vec::new(),
272 deals: Vec::new(),
273 version: ERNVersion::V4_3,
274 profile: None,
275 message_audit_trail: None,
276 attributes: None,
277 extensions: None,
278 legacy_extensions: None,
279 comments: None,
280 };
281
282 let message = ParsedERNMessage {
283 graph: graph_message,
284 flat: flat_message,
285 extensions: None,
286 };
287
288 Ok(message)
289 }
290
291 pub fn detect_version<R: std::io::BufRead>(
293 &self,
294 reader: R,
295 ) -> Result<ddex_core::models::versions::ERNVersion, error::ParseError> {
296 parser::detector::VersionDetector::detect(reader)
297 }
298
299 pub fn sanity_check<R: std::io::BufRead>(
301 &self,
302 _reader: R,
303 ) -> Result<SanityCheckResult, error::ParseError> {
304 Ok(SanityCheckResult {
306 is_valid: true,
307 version: ddex_core::models::versions::ERNVersion::V4_3,
308 errors: Vec::new(),
309 warnings: Vec::new(),
310 })
311 }
312}
313
314#[derive(Debug, Clone, Serialize, Deserialize)]
318pub struct SanityCheckResult {
319 pub is_valid: bool,
320 pub version: ddex_core::models::versions::ERNVersion,
321 pub errors: Vec<String>,
322 pub warnings: Vec<String>,
323}
324
325#[cfg(feature = "bench")]
327pub mod bench_report;
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332
333 #[test]
334 fn test_parser_creation() {
335 let parser = DDEXParser::new();
336 assert!(parser.config.disable_external_entities);
337 }
338}
339
340#[cfg(test)]
341mod api_integration_test;