1use reqwest::Client;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fmt;
10use std::sync::Arc;
11use std::time::{Duration, Instant};
12
13use crate::{
14 cache::{CacheConfig, FederationCache},
15 planner::StepType,
16 service_executor::{JoinExecutor, ServiceExecutor, ServiceExecutorConfig},
17};
18
19#[derive(Debug)]
21pub struct FederatedExecutor {
22 pub client: Client,
23 pub config: FederatedExecutorConfig,
24 pub service_executor: Arc<ServiceExecutor>,
25 pub join_executor: Arc<JoinExecutor>,
26 pub cache: Arc<FederationCache>,
27}
28
29#[derive(Debug, Clone, Serialize, Deserialize)]
31pub struct FederatedExecutorConfig {
32 pub request_timeout: Duration,
33 pub max_parallel_requests: usize,
34 pub user_agent: String,
35 pub cache_config: CacheConfig,
36 pub service_executor_config: ServiceExecutorConfig,
37 pub enable_adaptive_execution: bool,
38 pub enable_performance_monitoring: bool,
39 pub enable_circuit_breaker: bool,
40 pub circuit_breaker_config: CircuitBreakerConfig,
41 pub retry_config: RetryConfig,
42}
43
44impl Default for FederatedExecutorConfig {
45 fn default() -> Self {
46 Self {
47 request_timeout: Duration::from_secs(30),
48 max_parallel_requests: 10,
49 user_agent: "oxirs-federate/1.0".to_string(),
50 cache_config: CacheConfig::default(),
51 service_executor_config: ServiceExecutorConfig::default(),
52 enable_adaptive_execution: true,
53 enable_performance_monitoring: true,
54 enable_circuit_breaker: true,
55 circuit_breaker_config: CircuitBreakerConfig::default(),
56 retry_config: RetryConfig::default(),
57 }
58 }
59}
60
61#[derive(Debug, Clone, Serialize, Deserialize)]
63pub struct CircuitBreakerConfig {
64 pub failure_threshold: usize,
65 pub recovery_timeout: Duration,
66 pub half_open_max_calls: usize,
67}
68
69impl Default for CircuitBreakerConfig {
70 fn default() -> Self {
71 Self {
72 failure_threshold: 5,
73 recovery_timeout: Duration::from_secs(60),
74 half_open_max_calls: 3,
75 }
76 }
77}
78
79#[derive(Debug, Clone, Serialize, Deserialize)]
81pub struct RetryConfig {
82 pub max_retries: usize,
83 pub base_delay: Duration,
84 pub max_delay: Duration,
85 pub backoff_multiplier: f64,
86}
87
88impl Default for RetryConfig {
89 fn default() -> Self {
90 Self {
91 max_retries: 3,
92 base_delay: Duration::from_millis(100),
93 max_delay: Duration::from_secs(10),
94 backoff_multiplier: 2.0,
95 }
96 }
97}
98
99#[derive(Debug, Clone)]
101pub struct StepResult {
102 pub step_id: String,
103 pub step_type: StepType,
104 pub status: ExecutionStatus,
105 pub data: Option<QueryResultData>,
106 pub error: Option<String>,
107 pub execution_time: Duration,
108 pub service_id: Option<String>,
109 pub memory_used: usize,
110 pub result_size: usize,
111 pub success: bool,
112 pub error_message: Option<String>,
113 pub service_response_time: Duration,
114 pub cache_hit: bool,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
119pub enum ExecutionStatus {
120 Success,
121 Failed,
122 Timeout,
123 Cancelled,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct SparqlResults {
129 pub head: SparqlHead,
130 pub results: SparqlResultsData,
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct SparqlHead {
136 pub vars: Vec<String>,
137}
138
139#[derive(Debug, Clone, Serialize, Deserialize)]
141pub struct SparqlResultsData {
142 pub bindings: Vec<SparqlBinding>,
143}
144
145impl SparqlResultsData {
146 pub fn len(&self) -> usize {
147 self.bindings.len()
148 }
149
150 pub fn is_empty(&self) -> bool {
151 self.bindings.is_empty()
152 }
153}
154
155pub type SparqlBinding = HashMap<String, SparqlValue>;
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct SparqlValue {
161 #[serde(rename = "type")]
162 pub value_type: String,
163 pub value: String,
164 pub datatype: Option<String>,
165 pub lang: Option<String>,
166 #[serde(skip_serializing_if = "Option::is_none")]
168 pub quoted_triple: Option<QuotedTripleValue>,
169}
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct QuotedTripleValue {
174 pub subject: Box<RdfTerm>,
175 pub predicate: Box<RdfTerm>,
176 pub object: Box<RdfTerm>,
177}
178
179#[derive(Debug, Clone, Serialize, Deserialize)]
181pub enum RdfTerm {
182 IRI(String),
184 BlankNode(String),
186 Literal {
188 value: String,
189 datatype: Option<String>,
190 lang: Option<String>,
191 },
192 QuotedTriple(QuotedTripleValue),
194}
195
196impl SparqlValue {
197 pub fn literal(value: String, datatype: Option<String>, lang: Option<String>) -> Self {
199 Self {
200 value_type: "literal".to_string(),
201 value,
202 datatype,
203 lang,
204 quoted_triple: None,
205 }
206 }
207
208 pub fn iri(iri: String) -> Self {
210 Self {
211 value_type: "uri".to_string(),
212 value: iri,
213 datatype: None,
214 lang: None,
215 quoted_triple: None,
216 }
217 }
218
219 pub fn blank_node(id: String) -> Self {
221 Self {
222 value_type: "bnode".to_string(),
223 value: id,
224 datatype: None,
225 lang: None,
226 quoted_triple: None,
227 }
228 }
229
230 pub fn quoted_triple(subject: RdfTerm, predicate: RdfTerm, object: RdfTerm) -> Self {
232 let quoted_triple = QuotedTripleValue {
233 subject: Box::new(subject.clone()),
234 predicate: Box::new(predicate.clone()),
235 object: Box::new(object.clone()),
236 };
237
238 Self {
239 value_type: "quoted_triple".to_string(),
240 value: format!(
241 "<<{} {} {}>>",
242 quoted_triple.subject, quoted_triple.predicate, quoted_triple.object
243 ),
244 datatype: None,
245 lang: None,
246 quoted_triple: Some(quoted_triple),
247 }
248 }
249
250 pub fn is_quoted_triple(&self) -> bool {
252 self.quoted_triple.is_some()
253 }
254
255 pub fn get_quoted_triple(&self) -> Option<&QuotedTripleValue> {
257 self.quoted_triple.as_ref()
258 }
259
260 pub fn to_encoded_format(&self) -> EncodedSparqlValue {
262 match &self.quoted_triple {
263 Some(qt) => EncodedSparqlValue::QuotedTriple {
264 subject: qt.subject.to_encoded_term(),
265 predicate: qt.predicate.to_encoded_term(),
266 object: qt.object.to_encoded_term(),
267 },
268 None => match self.value_type.as_str() {
269 "uri" => EncodedSparqlValue::IRI(self.value.clone()),
270 "bnode" => EncodedSparqlValue::BlankNode(self.value.clone()),
271 "literal" => EncodedSparqlValue::Literal {
272 value: self.value.clone(),
273 datatype: self.datatype.clone(),
274 lang: self.lang.clone(),
275 },
276 _ => EncodedSparqlValue::Unknown(self.value.clone()),
277 },
278 }
279 }
280}
281
282impl fmt::Display for RdfTerm {
283 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
284 match self {
285 RdfTerm::IRI(iri) => write!(f, "<{iri}>"),
286 RdfTerm::BlankNode(id) => write!(f, "_:{id}"),
287 RdfTerm::Literal {
288 value,
289 datatype,
290 lang,
291 } => {
292 write!(f, "\"{value}\"")?;
293 if let Some(lang) = lang {
294 write!(f, "@{lang}")
295 } else if let Some(datatype) = datatype {
296 write!(f, "^^<{datatype}>")
297 } else {
298 Ok(())
299 }
300 }
301 RdfTerm::QuotedTriple(qt) => {
302 write!(f, "<<{} {} {}>>", qt.subject, qt.predicate, qt.object)
303 }
304 }
305 }
306}
307
308impl RdfTerm {
309 pub fn to_encoded_term(&self) -> EncodedTerm {
311 match self {
312 RdfTerm::IRI(iri) => EncodedTerm::IRI(iri.clone()),
313 RdfTerm::BlankNode(id) => EncodedTerm::BlankNode(id.clone()),
314 RdfTerm::Literal {
315 value,
316 datatype,
317 lang,
318 } => EncodedTerm::Literal {
319 value: value.clone(),
320 datatype: datatype.clone(),
321 lang: lang.clone(),
322 },
323 RdfTerm::QuotedTriple(qt) => EncodedTerm::QuotedTriple {
324 subject: Box::new(qt.subject.to_encoded_term()),
325 predicate: Box::new(qt.predicate.to_encoded_term()),
326 object: Box::new(qt.object.to_encoded_term()),
327 },
328 }
329 }
330}
331
332#[derive(Debug, Clone, Serialize, Deserialize)]
334pub enum EncodedSparqlValue {
335 IRI(String),
336 BlankNode(String),
337 Literal {
338 value: String,
339 datatype: Option<String>,
340 lang: Option<String>,
341 },
342 QuotedTriple {
343 subject: EncodedTerm,
344 predicate: EncodedTerm,
345 object: EncodedTerm,
346 },
347 Unknown(String),
348}
349
350#[derive(Debug, Clone, Serialize, Deserialize)]
352pub enum EncodedTerm {
353 IRI(String),
354 BlankNode(String),
355 Literal {
356 value: String,
357 datatype: Option<String>,
358 lang: Option<String>,
359 },
360 QuotedTriple {
361 subject: Box<EncodedTerm>,
362 predicate: Box<EncodedTerm>,
363 object: Box<EncodedTerm>,
364 },
365}
366
367impl EncodedSparqlValue {
368 pub fn to_sparql_value(&self) -> SparqlValue {
370 match self {
371 EncodedSparqlValue::IRI(iri) => SparqlValue::iri(iri.clone()),
372 EncodedSparqlValue::BlankNode(id) => SparqlValue::blank_node(id.clone()),
373 EncodedSparqlValue::Literal {
374 value,
375 datatype,
376 lang,
377 } => SparqlValue::literal(value.clone(), datatype.clone(), lang.clone()),
378 EncodedSparqlValue::QuotedTriple {
379 subject,
380 predicate,
381 object,
382 } => SparqlValue::quoted_triple(
383 subject.to_rdf_term(),
384 predicate.to_rdf_term(),
385 object.to_rdf_term(),
386 ),
387 EncodedSparqlValue::Unknown(value) => SparqlValue {
388 value_type: "unknown".to_string(),
389 value: value.clone(),
390 datatype: None,
391 lang: None,
392 quoted_triple: None,
393 },
394 }
395 }
396}
397
398impl EncodedTerm {
399 pub fn to_rdf_term(&self) -> RdfTerm {
401 match self {
402 EncodedTerm::IRI(iri) => RdfTerm::IRI(iri.clone()),
403 EncodedTerm::BlankNode(id) => RdfTerm::BlankNode(id.clone()),
404 EncodedTerm::Literal {
405 value,
406 datatype,
407 lang,
408 } => RdfTerm::Literal {
409 value: value.clone(),
410 datatype: datatype.clone(),
411 lang: lang.clone(),
412 },
413 EncodedTerm::QuotedTriple {
414 subject,
415 predicate,
416 object,
417 } => RdfTerm::QuotedTriple(QuotedTripleValue {
418 subject: Box::new(subject.to_rdf_term()),
419 predicate: Box::new(predicate.to_rdf_term()),
420 object: Box::new(object.to_rdf_term()),
421 }),
422 }
423 }
424}
425
426#[derive(Debug, Clone, Serialize, Deserialize)]
428pub struct GraphQLResponse {
429 pub data: serde_json::Value,
430 pub errors: Vec<GraphQLError>,
431 pub extensions: Option<serde_json::Value>,
432}
433
434#[derive(Debug, Clone, Serialize, Deserialize)]
436pub struct GraphQLError {
437 pub message: String,
438 pub locations: Option<Vec<GraphQLLocation>>,
439 pub path: Option<Vec<serde_json::Value>>,
440}
441
442#[derive(Debug, Clone, Serialize, Deserialize)]
444pub struct GraphQLLocation {
445 pub line: u32,
446 pub column: u32,
447}
448
449#[derive(Debug, Clone, Serialize, Deserialize)]
451pub enum QueryResultData {
452 Sparql(SparqlResults),
453 GraphQL(GraphQLResponse),
454 ServiceResult(serde_json::Value),
455}
456
457#[derive(Debug, Clone)]
459pub struct RuntimeStatistics {
460 pub group_start_times: HashMap<usize, Instant>,
461 pub group_durations: HashMap<usize, Duration>,
462 pub total_execution_time: Duration,
463 pub groups_executed: usize,
464 pub total_steps_executed: usize,
465 pub successful_steps: usize,
466 pub failed_steps: usize,
467 pub average_group_time: Duration,
468 pub peak_memory_usage: u64,
469 pub peak_cpu_usage: f64,
470}
471
472impl Default for RuntimeStatistics {
473 fn default() -> Self {
474 Self::new()
475 }
476}
477
478impl RuntimeStatistics {
479 pub fn new() -> Self {
480 Self {
481 group_start_times: HashMap::new(),
482 group_durations: HashMap::new(),
483 total_execution_time: Duration::from_secs(0),
484 groups_executed: 0,
485 total_steps_executed: 0,
486 successful_steps: 0,
487 failed_steps: 0,
488 average_group_time: Duration::from_secs(0),
489 peak_memory_usage: 0,
490 peak_cpu_usage: 0.0,
491 }
492 }
493
494 pub fn update_group_start(&mut self, group_idx: usize, start_time: Instant) {
495 self.group_start_times.insert(group_idx, start_time);
496 }
497
498 pub fn update_group_end(&mut self, group_idx: usize, duration: Duration) {
499 self.group_durations.insert(group_idx, duration);
500 self.groups_executed += 1;
501
502 let total_duration: Duration = self.group_durations.values().sum();
504 self.average_group_time = total_duration / self.groups_executed.max(1) as u32;
505 }
506
507 pub fn record_step_result(&mut self, success: bool) {
508 self.total_steps_executed += 1;
509 if success {
510 self.successful_steps += 1;
511 } else {
512 self.failed_steps += 1;
513 }
514 }
515
516 pub fn get_success_rate(&self) -> f64 {
517 if self.total_steps_executed == 0 {
518 0.0
519 } else {
520 self.successful_steps as f64 / self.total_steps_executed as f64
521 }
522 }
523}
524
525#[derive(Debug)]
527pub struct EnhancedPerformanceMonitor {
528 step_durations: Vec<Duration>,
529 parallel_durations: Vec<Duration>,
530 sequential_durations: Vec<Duration>,
531 memory_usage_samples: Vec<u64>,
532 cpu_usage_samples: Vec<f64>,
533 network_latency_samples: Vec<Duration>,
534 error_counts: HashMap<String, usize>,
535 bottlenecks: HashMap<BottleneckType, usize>,
536}
537
538impl Default for EnhancedPerformanceMonitor {
539 fn default() -> Self {
540 Self::new()
541 }
542}
543
544impl EnhancedPerformanceMonitor {
545 pub fn new() -> Self {
546 Self {
547 step_durations: Vec::new(),
548 parallel_durations: Vec::new(),
549 sequential_durations: Vec::new(),
550 memory_usage_samples: Vec::new(),
551 cpu_usage_samples: Vec::new(),
552 network_latency_samples: Vec::new(),
553 error_counts: HashMap::new(),
554 bottlenecks: HashMap::new(),
555 }
556 }
557
558 pub fn record_step_execution(&mut self, duration: Duration) {
559 self.step_durations.push(duration);
560 }
561
562 pub fn record_parallel_execution(&mut self, duration: Duration) {
563 self.parallel_durations.push(duration);
564 }
565
566 pub fn record_sequential_execution(&mut self, duration: Duration) {
567 self.sequential_durations.push(duration);
568 }
569
570 pub fn record_memory_usage(&mut self, usage: u64) {
571 self.memory_usage_samples.push(usage);
572 }
573
574 pub fn record_cpu_usage(&mut self, usage: f64) {
575 self.cpu_usage_samples.push(usage);
576 }
577
578 pub fn record_network_latency(&mut self, latency: Duration) {
579 self.network_latency_samples.push(latency);
580 }
581
582 pub fn record_error(&mut self, error_type: String) {
583 *self.error_counts.entry(error_type).or_insert(0) += 1;
584 }
585
586 pub fn record_bottleneck(&mut self, bottleneck: BottleneckType) {
587 *self.bottlenecks.entry(bottleneck).or_insert(0) += 1;
588 }
589
590 pub fn get_average_step_time(&self) -> Duration {
591 if self.step_durations.is_empty() {
592 Duration::from_secs(0)
593 } else {
594 self.step_durations.iter().sum::<Duration>() / self.step_durations.len() as u32
595 }
596 }
597
598 pub fn get_average_parallel_time(&self) -> Duration {
599 if self.parallel_durations.is_empty() {
600 Duration::from_secs(0)
601 } else {
602 self.parallel_durations.iter().sum::<Duration>() / self.parallel_durations.len() as u32
603 }
604 }
605
606 pub fn get_average_sequential_time(&self) -> Duration {
607 if self.sequential_durations.is_empty() {
608 Duration::from_secs(0)
609 } else {
610 self.sequential_durations.iter().sum::<Duration>()
611 / self.sequential_durations.len() as u32
612 }
613 }
614
615 pub fn get_memory_percentile(&self, percentile: f64) -> u64 {
616 if self.memory_usage_samples.is_empty() {
617 return 0;
618 }
619
620 let mut sorted = self.memory_usage_samples.clone();
621 sorted.sort();
622 let index = ((sorted.len() as f64 - 1.0) * percentile / 100.0).round() as usize;
623 sorted[index.min(sorted.len() - 1)]
624 }
625
626 pub fn get_cpu_percentile(&self, percentile: f64) -> f64 {
627 if self.cpu_usage_samples.is_empty() {
628 return 0.0;
629 }
630
631 let mut sorted = self.cpu_usage_samples.clone();
632 sorted.sort_by(|a, b| a.partial_cmp(b).unwrap());
633 let index = ((sorted.len() as f64 - 1.0) * percentile / 100.0).round() as usize;
634 sorted[index.min(sorted.len() - 1)]
635 }
636
637 pub fn get_primary_bottleneck(&self) -> Option<BottleneckType> {
638 self.bottlenecks
639 .iter()
640 .max_by_key(|(_, count)| *count)
641 .map(|(bottleneck, _)| *bottleneck)
642 }
643
644 pub fn get_error_rate(&self) -> f64 {
645 let total_errors: usize = self.error_counts.values().sum();
646 let total_executions = self.step_durations.len();
647
648 if total_executions == 0 {
649 0.0
650 } else {
651 total_errors as f64 / total_executions as f64
652 }
653 }
654}
655
656#[derive(Debug)]
658pub struct LocalResourceMonitor {
659 current_memory_usage: u64,
660 current_cpu_usage: f64,
661 peak_memory_usage: u64,
662 peak_cpu_usage: f64,
663}
664
665impl Default for LocalResourceMonitor {
666 fn default() -> Self {
667 Self::new()
668 }
669}
670
671impl LocalResourceMonitor {
672 pub fn new() -> Self {
673 Self {
674 current_memory_usage: 0,
675 current_cpu_usage: 0.0,
676 peak_memory_usage: 0,
677 peak_cpu_usage: 0.0,
678 }
679 }
680
681 pub fn get_memory_usage(&self) -> u64 {
682 self.current_memory_usage
684 }
685
686 pub fn get_cpu_usage(&self) -> f64 {
687 self.current_cpu_usage
689 }
690
691 pub fn update_memory_usage(&mut self, usage: u64) {
692 self.current_memory_usage = usage;
693 if usage > self.peak_memory_usage {
694 self.peak_memory_usage = usage;
695 }
696 }
697
698 pub fn update_cpu_usage(&mut self, usage: f64) {
699 self.current_cpu_usage = usage;
700 if usage > self.peak_cpu_usage {
701 self.peak_cpu_usage = usage;
702 }
703 }
704}
705
706#[derive(Debug, Clone)]
708pub struct LocalAdaptiveConfig {
709 pub performance_threshold: f64,
710 pub memory_threshold: u64,
711 pub cpu_threshold: f64,
712 pub error_rate_threshold: f64,
713 pub reoptimization_interval: usize,
714 pub parallel_threshold: usize,
715 pub streaming_threshold: usize,
716 pub latency_threshold: u128,
717 pub hybrid_batch_size: usize,
718 pub batch_delay_ms: u64,
719}
720
721impl Default for LocalAdaptiveConfig {
722 fn default() -> Self {
723 Self {
724 performance_threshold: 1.5, memory_threshold: 1024 * 1024 * 1024, cpu_threshold: 0.8, error_rate_threshold: 0.1, reoptimization_interval: 5, parallel_threshold: 3, streaming_threshold: 10, latency_threshold: 1000, hybrid_batch_size: 3, batch_delay_ms: 50, }
735 }
736}
737
738#[derive(Debug, Clone, Copy, PartialEq, Eq)]
740pub enum AdaptiveExecutionStrategy {
741 Parallel,
742 Sequential,
743 Hybrid,
744 Streaming,
745}
746
747#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)]
749pub enum BottleneckType {
750 NetworkLatency,
751 MemoryUsage,
752 CpuUsage,
753 DiskIo,
754}
755
756#[derive(Debug, Clone)]
758pub struct GroupExecutionData {
759 pub group_size: usize,
760 pub duration: Duration,
761 pub memory_used: u64,
762 pub cpu_used: f64,
763}
764
765#[derive(Debug, Clone)]
767pub struct EnhancedStepResult {
768 pub step_id: String,
769 pub execution_time: Duration,
770 pub memory_used: u64,
771 pub result_size: usize,
772 pub success: bool,
773 pub error_message: Option<String>,
774 pub service_response_time: Option<Duration>,
775 pub cache_hit: bool,
776}
777
778#[derive(Debug, Clone)]
780pub struct ReoptimizationConfig {
781 pub enable_reoptimization: bool,
782 pub reoptimization_threshold: f64,
783 pub minimum_group_size: usize,
784 pub memory_pressure_threshold: f64,
785 pub cpu_pressure_threshold: f64,
786 pub error_rate_threshold: f64,
787}
788
789impl Default for ReoptimizationConfig {
790 fn default() -> Self {
791 Self {
792 enable_reoptimization: true,
793 reoptimization_threshold: 1.5,
794 minimum_group_size: 2,
795 memory_pressure_threshold: 0.8,
796 cpu_pressure_threshold: 0.8,
797 error_rate_threshold: 0.1,
798 }
799 }
800}