1use crate::error::{PhysicsError, PhysicsResult};
7use chrono::{DateTime, Utc};
8use oxirs_core::model::NamedNode;
9use oxirs_core::rdf_store::RdfStore;
10use serde::{Deserialize, Serialize};
11use std::collections::HashMap;
12use std::sync::Arc;
13use uuid::Uuid;
14
15pub struct ResultInjector {
17 store: Option<Arc<RdfStore>>,
19
20 enable_provenance: bool,
22
23 config: InjectionConfig,
25}
26
27#[derive(Debug, Clone)]
29pub struct InjectionConfig {
30 pub physics_prefix: String,
32 pub prov_prefix: String,
34 pub batch_size: usize,
36 pub use_transactions: bool,
38}
39
40impl Default for InjectionConfig {
41 fn default() -> Self {
42 Self {
43 physics_prefix: "http://oxirs.org/physics#".to_string(),
44 prov_prefix: "http://www.w3.org/ns/prov#".to_string(),
45 batch_size: 1000,
46 use_transactions: true,
47 }
48 }
49}
50
51impl ResultInjector {
52 pub fn new() -> Self {
54 Self {
55 store: None,
56 enable_provenance: true,
57 config: InjectionConfig::default(),
58 }
59 }
60
61 pub fn with_store(store: Arc<RdfStore>) -> Self {
63 Self {
64 store: Some(store),
65 enable_provenance: true,
66 config: InjectionConfig::default(),
67 }
68 }
69
70 pub fn with_config(mut self, config: InjectionConfig) -> Self {
72 self.config = config;
73 self
74 }
75
76 pub fn without_provenance(mut self) -> Self {
78 self.enable_provenance = false;
79 self
80 }
81
82 pub async fn inject(&self, result: &SimulationResult) -> PhysicsResult<()> {
84 self.validate_result(result)?;
86
87 if let Some(ref store) = self.store {
88 self.inject_with_sparql(store, result).await?;
90 } else {
91 tracing::debug!(
93 "Would inject {} state vectors for entity {}",
94 result.state_trajectory.len(),
95 result.entity_iri
96 );
97 }
98
99 Ok(())
100 }
101
102 fn validate_result(&self, result: &SimulationResult) -> PhysicsResult<()> {
104 if result.entity_iri.is_empty() {
105 return Err(PhysicsError::ResultInjection(
106 "Entity IRI cannot be empty".to_string(),
107 ));
108 }
109
110 if result.simulation_run_id.is_empty() {
111 return Err(PhysicsError::ResultInjection(
112 "Simulation run ID cannot be empty".to_string(),
113 ));
114 }
115
116 if result.state_trajectory.is_empty() {
117 return Err(PhysicsError::ResultInjection(
118 "State trajectory cannot be empty".to_string(),
119 ));
120 }
121
122 Ok(())
123 }
124
125 async fn inject_with_sparql(
127 &self,
128 store: &RdfStore,
129 result: &SimulationResult,
130 ) -> PhysicsResult<()> {
131 let metadata_update = self.generate_metadata_update(result);
133
134 self.execute_update(store, &metadata_update).await?;
136
137 if result.state_trajectory.len() > self.config.batch_size {
139 self.inject_in_batches(store, result).await?;
140 } else {
141 let trajectory_update = self.generate_state_trajectory_update(result);
142 self.execute_update(store, &trajectory_update).await?;
143 }
144
145 if self.enable_provenance {
147 let provenance_update = self.generate_provenance_update(result);
148 self.execute_update(store, &provenance_update).await?;
149 }
150
151 Ok(())
152 }
153
154 async fn execute_update(&self, _store: &RdfStore, update_query: &str) -> PhysicsResult<()> {
156 tracing::debug!("SPARQL UPDATE:\n{}", update_query);
161
162 Ok(())
167 }
168
169 fn generate_metadata_update(&self, result: &SimulationResult) -> String {
171 let phys = &self.config.physics_prefix;
172
173 format!(
174 r#"
175 PREFIX phys: <{phys}>
176 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
177
178 INSERT DATA {{
179 <{run_id}> a phys:SimulationRun .
180 <{run_id}> phys:simulatesEntity <{entity}> .
181 <{run_id}> phys:timestamp "{timestamp}"^^xsd:dateTime .
182 <{run_id}> phys:converged {converged}^^xsd:boolean .
183 <{run_id}> phys:iterations {iterations}^^xsd:integer .
184 <{run_id}> phys:finalResidual {residual}^^xsd:double .
185 }}
186 "#,
187 phys = phys,
188 run_id = result.simulation_run_id,
189 entity = result.entity_iri,
190 timestamp = result.timestamp.to_rfc3339(),
191 converged = result.convergence_info.converged,
192 iterations = result.convergence_info.iterations,
193 residual = result.convergence_info.final_residual,
194 )
195 }
196
197 fn generate_state_trajectory_update(&self, result: &SimulationResult) -> String {
199 let phys = &self.config.physics_prefix;
200 let mut triples = Vec::new();
201
202 let states_to_insert = result.state_trajectory.iter().take(100);
204
205 for (idx, state) in states_to_insert.enumerate() {
206 let state_id = format!("{}#state_{}", result.simulation_run_id, idx);
207
208 triples.push(format!(
209 "<{run_id}> phys:hasState <{state_id}> .",
210 run_id = result.simulation_run_id,
211 state_id = state_id
212 ));
213 triples.push(format!(
214 "<{state_id}> phys:time {time}^^xsd:double .",
215 state_id = state_id,
216 time = state.time
217 ));
218
219 for (key, value) in &state.state {
220 triples.push(format!(
221 "<{state_id}> phys:{key} {value}^^xsd:double .",
222 state_id = state_id,
223 key = key,
224 value = value
225 ));
226 }
227 }
228
229 for (key, value) in &result.derived_quantities {
231 triples.push(format!(
232 "<{run_id}> phys:{key} {value}^^xsd:double .",
233 run_id = result.simulation_run_id,
234 key = key,
235 value = value
236 ));
237 }
238
239 format!(
240 r#"
241 PREFIX phys: <{phys}>
242 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
243
244 INSERT DATA {{
245 {triples}
246 }}
247 "#,
248 phys = phys,
249 triples = triples.join("\n ")
250 )
251 }
252
253 fn generate_provenance_update(&self, result: &SimulationResult) -> String {
255 let prov = &self.config.prov_prefix;
256 let phys = &self.config.physics_prefix;
257
258 format!(
259 r#"
260 PREFIX prov: <{prov}>
261 PREFIX phys: <{phys}>
262 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
263
264 INSERT DATA {{
265 <{entity}> prov:wasGeneratedBy <{run_id}> .
266 <{run_id}> a prov:Activity .
267 <{run_id}> prov:startedAtTime "{executed_at}"^^xsd:dateTime .
268 <{run_id}> prov:used <{entity}> .
269 <{run_id}> prov:wasAssociatedWith <{software_agent}> .
270 <{software_agent}> a prov:SoftwareAgent .
271 <{software_agent}> prov:label "{software}"^^xsd:string .
272 <{software_agent}> phys:version "{version}"^^xsd:string .
273 <{run_id}> phys:parametersHash "{params_hash}"^^xsd:string .
274 <{run_id}> phys:executionTimeMs {exec_time}^^xsd:integer .
275 }}
276 "#,
277 prov = prov,
278 phys = phys,
279 entity = result.entity_iri,
280 run_id = result.simulation_run_id,
281 executed_at = result.provenance.executed_at.to_rfc3339(),
282 software_agent = format!("urn:agent:{}", result.provenance.software),
283 software = result.provenance.software,
284 version = result.provenance.version,
285 params_hash = result.provenance.parameters_hash,
286 exec_time = result.provenance.execution_time_ms,
287 )
288 }
289
290 async fn inject_in_batches(
292 &self,
293 store: &RdfStore,
294 result: &SimulationResult,
295 ) -> PhysicsResult<()> {
296 let phys = &self.config.physics_prefix;
297
298 for (batch_idx, chunk) in result
299 .state_trajectory
300 .chunks(self.config.batch_size)
301 .enumerate()
302 {
303 let mut triples = Vec::new();
304
305 for (idx_in_chunk, state) in chunk.iter().enumerate() {
306 let global_idx = batch_idx * self.config.batch_size + idx_in_chunk;
307 let state_id = format!("{}#state_{}", result.simulation_run_id, global_idx);
308
309 triples.push(format!(
310 "<{run_id}> phys:hasState <{state_id}> .",
311 run_id = result.simulation_run_id,
312 state_id = state_id
313 ));
314 triples.push(format!(
315 "<{state_id}> phys:time {time}^^xsd:double .",
316 state_id = state_id,
317 time = state.time
318 ));
319
320 for (key, value) in &state.state {
321 triples.push(format!(
322 "<{state_id}> phys:{key} {value}^^xsd:double .",
323 state_id = state_id,
324 key = key,
325 value = value
326 ));
327 }
328 }
329
330 let batch_update = format!(
331 r#"
332 PREFIX phys: <{phys}>
333 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
334
335 INSERT DATA {{
336 {triples}
337 }}
338 "#,
339 phys = phys,
340 triples = triples.join("\n ")
341 );
342
343 self.execute_update(store, &batch_update).await?;
344
345 tracing::debug!("Injected batch {} ({} states)", batch_idx + 1, chunk.len());
346 }
347
348 Ok(())
349 }
350
351 pub fn create_result_node(
353 &self,
354 _entity: &NamedNode,
355 property: &str,
356 ) -> PhysicsResult<NamedNode> {
357 let result_id = Uuid::new_v4();
358 let result_iri = format!(
359 "{}result_{}_{}",
360 self.config.physics_prefix, property, result_id
361 );
362
363 NamedNode::new(&result_iri)
364 .map_err(|e| PhysicsError::ResultInjection(format!("Invalid result IRI: {}", e)))
365 }
366
367 pub async fn write_timestamped_value(
369 &self,
370 store: &RdfStore,
371 result_node: &NamedNode,
372 value: &ResultValue,
373 ) -> PhysicsResult<()> {
374 let timestamp = Utc::now();
375 let phys = &self.config.physics_prefix;
376
377 let update = match &value.value {
378 ResultData::Scalar(v) => {
379 format!(
380 r#"
381 PREFIX phys: <{phys}>
382 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
383
384 INSERT DATA {{
385 <{node}> phys:property "{property}"^^xsd:string .
386 <{node}> phys:value {value}^^xsd:double .
387 <{node}> phys:timestamp "{timestamp}"^^xsd:dateTime .
388 }}
389 "#,
390 phys = phys,
391 node = result_node.as_str(),
392 property = value.property,
393 value = v,
394 timestamp = timestamp.to_rfc3339(),
395 )
396 }
397 ResultData::Vector(vec) => {
398 let mut triples = Vec::new();
399 triples.push(format!(
400 "<{node}> phys:property \"{property}\"^^xsd:string .",
401 node = result_node.as_str(),
402 property = value.property
403 ));
404 triples.push(format!(
405 "<{node}> phys:timestamp \"{timestamp}\"^^xsd:dateTime .",
406 node = result_node.as_str(),
407 timestamp = timestamp.to_rfc3339()
408 ));
409
410 for (i, v) in vec.iter().enumerate() {
411 triples.push(format!(
412 "<{node}> phys:component{i} {v}^^xsd:double .",
413 node = result_node.as_str(),
414 i = i,
415 v = v
416 ));
417 }
418
419 format!(
420 r#"
421 PREFIX phys: <{phys}>
422 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
423
424 INSERT DATA {{
425 {triples}
426 }}
427 "#,
428 phys = phys,
429 triples = triples.join("\n ")
430 )
431 }
432 ResultData::Tensor(tensor) => {
433 let mut triples = Vec::new();
434 triples.push(format!(
435 "<{node}> phys:property \"{property}\"^^xsd:string .",
436 node = result_node.as_str(),
437 property = value.property
438 ));
439
440 for (i, row) in tensor.iter().enumerate() {
441 for (j, v) in row.iter().enumerate() {
442 triples.push(format!(
443 "<{node}> phys:tensor_{i}_{j} {v}^^xsd:double .",
444 node = result_node.as_str(),
445 i = i,
446 j = j,
447 v = v
448 ));
449 }
450 }
451
452 format!(
453 r#"
454 PREFIX phys: <{phys}>
455 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
456
457 INSERT DATA {{
458 {triples}
459 }}
460 "#,
461 phys = phys,
462 triples = triples.join("\n ")
463 )
464 }
465 ResultData::TimeSeries(series) => {
466 let mut triples = Vec::new();
467 triples.push(format!(
468 "<{node}> phys:property \"{property}\"^^xsd:string .",
469 node = result_node.as_str(),
470 property = value.property
471 ));
472
473 for (i, (time, value)) in series.iter().enumerate() {
474 let point_id = format!("{}#point_{}", result_node.as_str(), i);
475 triples.push(format!(
476 "<{node}> phys:hasPoint <{point_id}> .",
477 node = result_node.as_str(),
478 point_id = point_id
479 ));
480 triples.push(format!(
481 "<{point_id}> phys:time {time}^^xsd:double .",
482 point_id = point_id,
483 time = time
484 ));
485 triples.push(format!(
486 "<{point_id}> phys:value {value}^^xsd:double .",
487 point_id = point_id,
488 value = value
489 ));
490 }
491
492 format!(
493 r#"
494 PREFIX phys: <{phys}>
495 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
496
497 INSERT DATA {{
498 {triples}
499 }}
500 "#,
501 phys = phys,
502 triples = triples.join("\n ")
503 )
504 }
505 };
506
507 self.execute_update(store, &update).await
508 }
509
510 pub async fn add_provenance(
512 &self,
513 store: &RdfStore,
514 result_node: &NamedNode,
515 provenance: &ProvenanceInfo,
516 ) -> PhysicsResult<()> {
517 let prov = &self.config.prov_prefix;
518
519 let update = format!(
520 r#"
521 PREFIX prov: <{prov}>
522 PREFIX xsd: <http://www.w3.org/2001/XMLSchema#>
523
524 INSERT DATA {{
525 <{node}> prov:wasGeneratedBy <{activity}> .
526 <{activity}> a prov:Activity .
527 <{activity}> prov:startedAtTime "{timestamp}"^^xsd:dateTime .
528 <{activity}> prov:wasAssociatedWith <{agent}> .
529 <{agent}> a prov:SoftwareAgent .
530 <{agent}> prov:label "{software}"^^xsd:string .
531 }}
532 "#,
533 prov = prov,
534 node = result_node.as_str(),
535 activity = provenance.activity_id,
536 timestamp = provenance.timestamp.to_rfc3339(),
537 agent = provenance.agent_id,
538 software = provenance.software,
539 );
540
541 self.execute_update(store, &update).await
542 }
543
544 pub fn begin_transaction(&self) -> PhysicsResult<Transaction> {
546 Ok(Transaction {
547 id: Uuid::new_v4().to_string(),
548 updates: Vec::new(),
549 })
550 }
551
552 pub async fn commit_transaction(&self, store: &RdfStore, tx: Transaction) -> PhysicsResult<()> {
554 for update in tx.updates {
555 self.execute_update(store, &update).await?;
556 }
557 Ok(())
558 }
559}
560
561impl Default for ResultInjector {
562 fn default() -> Self {
563 Self::new()
564 }
565}
566
567#[derive(Debug, Clone)]
569pub struct Transaction {
570 pub id: String,
571 pub updates: Vec<String>,
572}
573
574#[derive(Debug, Clone)]
576pub struct ProvenanceInfo {
577 pub activity_id: String,
578 pub agent_id: String,
579 pub software: String,
580 pub timestamp: DateTime<Utc>,
581}
582
583#[derive(Debug, Clone)]
585pub struct ResultValue {
586 pub property: String,
587 pub value: ResultData,
588}
589
590#[derive(Debug, Clone)]
592pub enum ResultData {
593 Scalar(f64),
594 Vector(Vec<f64>),
595 Tensor(Vec<Vec<f64>>),
596 TimeSeries(Vec<(f64, f64)>),
597}
598
599#[derive(Debug, Clone, Serialize, Deserialize)]
601pub struct SimulationResult {
602 pub entity_iri: String,
603 pub simulation_run_id: String,
604 pub timestamp: DateTime<Utc>,
605 pub state_trajectory: Vec<StateVector>,
606 pub derived_quantities: HashMap<String, f64>,
607 pub convergence_info: ConvergenceInfo,
608 pub provenance: SimulationProvenance,
609}
610
611#[derive(Debug, Clone, Serialize, Deserialize)]
613pub struct StateVector {
614 pub time: f64,
615 pub state: HashMap<String, f64>,
616}
617
618#[derive(Debug, Clone, Serialize, Deserialize)]
620pub struct ConvergenceInfo {
621 pub converged: bool,
622 pub iterations: usize,
623 pub final_residual: f64,
624}
625
626#[derive(Debug, Clone, Serialize, Deserialize)]
628pub struct SimulationProvenance {
629 pub software: String,
630 pub version: String,
631 pub parameters_hash: String,
632 pub executed_at: DateTime<Utc>,
633 pub execution_time_ms: u64,
634}
635
636#[cfg(test)]
637mod tests {
638 use super::*;
639
640 fn create_test_result() -> SimulationResult {
641 let mut trajectory = Vec::new();
642
643 for i in 0..10 {
644 let mut state = HashMap::new();
645 state.insert("temperature".to_string(), 300.0 + i as f64);
646 trajectory.push(StateVector {
647 time: i as f64 * 10.0,
648 state,
649 });
650 }
651
652 SimulationResult {
653 entity_iri: "urn:example:battery:001".to_string(),
654 simulation_run_id: "run-123".to_string(),
655 timestamp: Utc::now(),
656 state_trajectory: trajectory,
657 derived_quantities: HashMap::new(),
658 convergence_info: ConvergenceInfo {
659 converged: true,
660 iterations: 100,
661 final_residual: 1e-6,
662 },
663 provenance: SimulationProvenance {
664 software: "oxirs-physics".to_string(),
665 version: "0.1.0".to_string(),
666 parameters_hash: "abc123".to_string(),
667 executed_at: Utc::now(),
668 execution_time_ms: 1500,
669 },
670 }
671 }
672
673 #[tokio::test]
674 async fn test_result_injector_basic() {
675 let injector = ResultInjector::new();
676 let result = create_test_result();
677
678 assert!(injector.inject(&result).await.is_ok());
680 }
681
682 #[tokio::test]
683 async fn test_result_injector_without_provenance() {
684 let injector = ResultInjector::new().without_provenance();
685 let result = create_test_result();
686
687 assert!(injector.inject(&result).await.is_ok());
688 }
689
690 #[tokio::test]
691 async fn test_validate_result_empty_entity_iri() {
692 let injector = ResultInjector::new();
693 let mut result = create_test_result();
694 result.entity_iri = String::new();
695
696 assert!(injector.inject(&result).await.is_err());
698 }
699
700 #[tokio::test]
701 async fn test_validate_result_empty_run_id() {
702 let injector = ResultInjector::new();
703 let mut result = create_test_result();
704 result.simulation_run_id = String::new();
705
706 assert!(injector.inject(&result).await.is_err());
708 }
709
710 #[tokio::test]
711 async fn test_validate_result_empty_trajectory() {
712 let injector = ResultInjector::new();
713 let mut result = create_test_result();
714 result.state_trajectory.clear();
715
716 assert!(injector.inject(&result).await.is_err());
718 }
719
720 #[test]
721 fn test_generate_metadata_update() {
722 let injector = ResultInjector::new();
723 let result = create_test_result();
724
725 let query = injector.generate_metadata_update(&result);
726
727 assert!(query.contains("INSERT DATA"));
729 assert!(query.contains("phys:SimulationRun"));
730 assert!(query.contains(&result.simulation_run_id));
731 assert!(query.contains(&result.entity_iri));
732 assert!(query.contains("phys:converged"));
733 assert!(query.contains("phys:iterations"));
734 }
735
736 #[test]
737 fn test_generate_state_trajectory_update() {
738 let injector = ResultInjector::new();
739 let result = create_test_result();
740
741 let query = injector.generate_state_trajectory_update(&result);
742
743 assert!(query.contains("INSERT DATA"));
745 assert!(query.contains("phys:hasState"));
746 assert!(query.contains("phys:time"));
747 assert!(query.contains(&result.simulation_run_id));
748 }
749
750 #[test]
751 fn test_generate_provenance_update() {
752 let injector = ResultInjector::new();
753 let result = create_test_result();
754
755 let query = injector.generate_provenance_update(&result);
756
757 assert!(query.contains("prov:wasGeneratedBy"));
759 assert!(query.contains("prov:Activity"));
760 assert!(query.contains("prov:SoftwareAgent"));
761 assert!(query.contains(&result.provenance.software));
762 assert!(query.contains(&result.provenance.version));
763 assert!(query.contains(&result.provenance.parameters_hash));
764 }
765
766 #[test]
767 fn test_simulation_result_serialization() {
768 let result = create_test_result();
769
770 let json = serde_json::to_string(&result).expect("Failed to serialize");
771 let deserialized: SimulationResult =
772 serde_json::from_str(&json).expect("Failed to deserialize");
773
774 assert_eq!(deserialized.entity_iri, result.entity_iri);
775 assert_eq!(deserialized.simulation_run_id, result.simulation_run_id);
776 assert_eq!(
777 deserialized.state_trajectory.len(),
778 result.state_trajectory.len()
779 );
780 assert_eq!(
781 deserialized.convergence_info.converged,
782 result.convergence_info.converged
783 );
784 }
785
786 #[test]
787 fn test_state_vector_serialization() {
788 let mut state = HashMap::new();
789 state.insert("temperature".to_string(), 300.0);
790 state.insert("pressure".to_string(), 101325.0);
791
792 let vector = StateVector {
793 time: 10.0,
794 state: state.clone(),
795 };
796
797 let json = serde_json::to_string(&vector).expect("Failed to serialize");
798 let deserialized: StateVector = serde_json::from_str(&json).expect("Failed to deserialize");
799
800 assert_eq!(deserialized.time, 10.0);
801 assert_eq!(deserialized.state.len(), 2);
802 assert_eq!(deserialized.state.get("temperature"), Some(&300.0));
803 }
804
805 #[test]
806 fn test_convergence_info() {
807 let info = ConvergenceInfo {
808 converged: true,
809 iterations: 150,
810 final_residual: 5e-7,
811 };
812
813 assert!(info.converged);
814 assert_eq!(info.iterations, 150);
815 assert!(info.final_residual < 1e-6);
816 }
817
818 #[test]
819 fn test_provenance_tracking() {
820 let prov = SimulationProvenance {
821 software: "oxirs-physics".to_string(),
822 version: "0.1.0".to_string(),
823 parameters_hash: "def456".to_string(),
824 executed_at: Utc::now(),
825 execution_time_ms: 2500,
826 };
827
828 assert_eq!(prov.software, "oxirs-physics");
829 assert_eq!(prov.version, "0.1.0");
830 assert_eq!(prov.execution_time_ms, 2500);
831 }
832
833 #[test]
834 fn test_injection_config() {
835 let config = InjectionConfig {
836 physics_prefix: "http://example.org/phys#".to_string(),
837 prov_prefix: "http://example.org/prov#".to_string(),
838 batch_size: 500,
839 use_transactions: false,
840 };
841
842 assert_eq!(config.physics_prefix, "http://example.org/phys#");
843 assert_eq!(config.batch_size, 500);
844 assert!(!config.use_transactions);
845 }
846
847 #[test]
848 fn test_create_result_node() {
849 let injector = ResultInjector::new();
850 let entity = NamedNode::new("http://example.org/entity1").expect("Failed to create node");
851
852 let result_node = injector
853 .create_result_node(&entity, "displacement")
854 .expect("Failed to create result node");
855
856 assert!(result_node.as_str().contains("displacement"));
857 assert!(result_node
858 .as_str()
859 .starts_with("http://oxirs.org/physics#result_"));
860 }
861
862 #[test]
863 fn test_transaction() {
864 let injector = ResultInjector::new();
865 let tx = injector
866 .begin_transaction()
867 .expect("Failed to begin transaction");
868
869 assert!(!tx.id.is_empty());
870 assert!(tx.updates.is_empty());
871 }
872}