1use crate::ir::{KnowledgeBase, Predicate, Rule, Term};
29use crate::proof_storage::{ProofFragment, ProofFragmentRef};
30use crate::reasoning::{Proof, Substitution};
31use async_trait::async_trait;
32use ipfrs_core::Cid;
33use serde::{Deserialize, Serialize};
34use std::collections::{HashMap, HashSet};
35use std::sync::Arc;
36use thiserror::Error;
37
38#[derive(Debug, Error)]
40pub enum RemoteReasoningError {
41 #[error("Network error: {0}")]
42 NetworkError(String),
43
44 #[error("Timeout waiting for remote response")]
45 Timeout,
46
47 #[error("Invalid response from peer: {0}")]
48 InvalidResponse(String),
49
50 #[error("Peer not found: {0}")]
51 PeerNotFound(String),
52
53 #[error("No peers available for query")]
54 NoPeersAvailable,
55
56 #[error("Serialization error: {0}")]
57 SerializationError(String),
58
59 #[error("Remote query failed: {0}")]
60 QueryFailed(String),
61}
62
63#[derive(Debug, Clone, Serialize, Deserialize)]
65pub struct QueryRequest {
66 pub predicate_name: String,
68
69 pub ground_args: Vec<String>,
71
72 pub max_results: usize,
74
75 pub max_depth: usize,
77
78 pub request_id: String,
80}
81
82#[derive(Debug, Clone, Serialize, Deserialize)]
84pub struct QueryResponse {
85 pub request_id: String,
87
88 pub predicates: Vec<Predicate>,
90
91 pub rules: Vec<Rule>,
93
94 pub proof_fragments: Vec<ProofFragmentRef>,
96
97 pub peer_id: String,
99
100 pub has_more: bool,
102
103 pub continuation_token: Option<String>,
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
109pub struct FactDiscoveryRequest {
110 pub predicate_name: String,
112
113 pub arg_patterns: Vec<Option<String>>,
115
116 pub max_hops: usize,
118
119 pub ttl: u32,
121
122 pub exclude_peers: HashSet<String>,
124}
125
126#[derive(Debug, Clone, Serialize, Deserialize)]
128pub struct FactDiscoveryResponse {
129 pub facts: Vec<Predicate>,
131
132 pub sources: HashMap<usize, String>, pub peers_queried: usize,
137
138 pub hops: HashMap<usize, usize>, }
141
142#[derive(Debug, Clone, Serialize, Deserialize)]
144pub struct IncrementalLoadRequest {
145 pub predicate_name: String,
147
148 pub batch_size: usize,
150
151 pub offset: usize,
153
154 pub filter: Option<HashMap<String, String>>,
156}
157
158#[derive(Debug, Clone, Serialize, Deserialize)]
160pub struct IncrementalLoadResponse {
161 pub batch: Vec<Predicate>,
163
164 pub total_count: usize,
166
167 pub next_offset: Option<usize>,
169
170 pub is_last: bool,
172}
173
174#[derive(Debug, Clone, Serialize, Deserialize)]
176pub struct GoalResolutionRequest {
177 pub goal: Predicate,
179
180 pub substitution: HashMap<String, Term>,
182
183 pub depth: usize,
185
186 pub requester: String,
188
189 pub request_id: String,
191}
192
193#[derive(Debug, Clone, Serialize, Deserialize)]
195pub struct GoalResolutionResponse {
196 pub request_id: String,
198
199 pub solved: bool,
201
202 pub solutions: Vec<HashMap<String, Term>>,
204
205 pub proof: Option<Proof>,
207
208 pub proof_fragments: Vec<ProofFragmentRef>,
210}
211
212#[async_trait]
214pub trait RemoteKnowledgeProvider: Send + Sync {
215 async fn query_predicate(
217 &self,
218 request: QueryRequest,
219 ) -> Result<QueryResponse, RemoteReasoningError>;
220
221 async fn discover_facts(
223 &self,
224 request: FactDiscoveryRequest,
225 ) -> Result<FactDiscoveryResponse, RemoteReasoningError>;
226
227 async fn load_incremental(
229 &self,
230 request: IncrementalLoadRequest,
231 ) -> Result<IncrementalLoadResponse, RemoteReasoningError>;
232
233 async fn resolve_goal(
235 &self,
236 request: GoalResolutionRequest,
237 ) -> Result<GoalResolutionResponse, RemoteReasoningError>;
238
239 async fn get_available_peers(&self) -> Result<Vec<String>, RemoteReasoningError>;
241}
242
243pub struct DistributedGoalResolver {
245 local_kb: Arc<KnowledgeBase>,
247
248 remote_provider: Option<Arc<dyn RemoteKnowledgeProvider>>,
250
251 max_depth: usize,
253
254 timeout_ms: u64,
256
257 remote_fact_cache: HashMap<String, Vec<Predicate>>,
259}
260
261impl DistributedGoalResolver {
262 pub fn new(local_kb: Arc<KnowledgeBase>) -> Self {
264 Self {
265 local_kb,
266 remote_provider: None,
267 max_depth: 10,
268 timeout_ms: 5000,
269 remote_fact_cache: HashMap::new(),
270 }
271 }
272
273 pub fn with_provider(mut self, provider: Arc<dyn RemoteKnowledgeProvider>) -> Self {
275 self.remote_provider = Some(provider);
276 self
277 }
278
279 pub fn with_max_depth(mut self, max_depth: usize) -> Self {
281 self.max_depth = max_depth;
282 self
283 }
284
285 pub fn with_timeout(mut self, timeout_ms: u64) -> Self {
287 self.timeout_ms = timeout_ms;
288 self
289 }
290
291 pub async fn resolve(
293 &mut self,
294 goal: &Predicate,
295 substitution: &Substitution,
296 ) -> Result<Vec<Substitution>, RemoteReasoningError> {
297 let local_solutions = self.resolve_local(goal, substitution);
299
300 if !local_solutions.is_empty() {
301 return Ok(local_solutions);
302 }
303
304 if let Some(provider) = self.remote_provider.clone() {
306 let remote_solutions = self.resolve_remote(goal, substitution, &provider).await?;
307 Ok(remote_solutions)
308 } else {
309 Ok(Vec::new())
310 }
311 }
312
313 fn resolve_local(&self, goal: &Predicate, _substitution: &Substitution) -> Vec<Substitution> {
315 let facts = self.local_kb.get_predicates(&goal.name);
317
318 let mut solutions = Vec::new();
319 for fact in facts {
320 if let Some(subst) =
321 crate::reasoning::unify_predicates(goal, fact, &Substitution::new())
322 {
323 solutions.push(subst);
324 }
325 }
326
327 solutions
328 }
329
330 async fn resolve_remote(
332 &mut self,
333 goal: &Predicate,
334 substitution: &Substitution,
335 provider: &Arc<dyn RemoteKnowledgeProvider>,
336 ) -> Result<Vec<Substitution>, RemoteReasoningError> {
337 let request = GoalResolutionRequest {
339 goal: goal.clone(),
340 substitution: substitution.clone(),
341 depth: 0,
342 requester: "local".to_string(),
343 request_id: uuid::Uuid::new_v4().to_string(),
344 };
345
346 let response = provider.resolve_goal(request).await?;
348
349 Ok(response.solutions)
351 }
352
353 pub async fn prefetch_facts(
355 &mut self,
356 predicate_name: &str,
357 ) -> Result<usize, RemoteReasoningError> {
358 let Some(provider) = &self.remote_provider else {
359 return Ok(0);
360 };
361
362 let request = FactDiscoveryRequest {
364 predicate_name: predicate_name.to_string(),
365 arg_patterns: Vec::new(),
366 max_hops: 3,
367 ttl: 30,
368 exclude_peers: HashSet::new(),
369 };
370
371 let response = provider.discover_facts(request).await?;
373
374 let count = response.facts.len();
376 self.remote_fact_cache
377 .insert(predicate_name.to_string(), response.facts);
378
379 Ok(count)
380 }
381
382 pub fn get_cached_facts(&self, predicate_name: &str) -> Option<&[Predicate]> {
384 self.remote_fact_cache
385 .get(predicate_name)
386 .map(|v| v.as_slice())
387 }
388
389 pub fn clear_cache(&mut self) {
391 self.remote_fact_cache.clear();
392 }
393}
394
395pub struct DistributedProofAssembler {
397 remote_provider: Arc<dyn RemoteKnowledgeProvider>,
399
400 fragment_cache: HashMap<Cid, ProofFragment>,
402
403 #[allow(dead_code)]
405 max_depth: usize,
406}
407
408impl DistributedProofAssembler {
409 pub fn new(remote_provider: Arc<dyn RemoteKnowledgeProvider>) -> Self {
411 Self {
412 remote_provider,
413 fragment_cache: HashMap::new(),
414 max_depth: 100,
415 }
416 }
417
418 pub async fn assemble_proof(
420 &mut self,
421 goal: &Predicate,
422 ) -> Result<Option<Proof>, RemoteReasoningError> {
423 let request = GoalResolutionRequest {
425 goal: goal.clone(),
426 substitution: HashMap::new(),
427 depth: 0,
428 requester: "local".to_string(),
429 request_id: uuid::Uuid::new_v4().to_string(),
430 };
431
432 let response = self.remote_provider.resolve_goal(request).await?;
433
434 if response.solved {
435 Ok(response.proof)
436 } else {
437 Ok(None)
438 }
439 }
440
441 pub async fn fetch_fragment(
443 &mut self,
444 cid: Cid,
445 ) -> Result<ProofFragment, RemoteReasoningError> {
446 if let Some(fragment) = self.fragment_cache.get(&cid) {
448 return Ok(fragment.clone());
449 }
450
451 Err(RemoteReasoningError::NetworkError(
454 "Fragment fetch not yet implemented".to_string(),
455 ))
456 }
457}
458
459pub struct MockRemoteKnowledgeProvider {
461 mock_kb: Arc<KnowledgeBase>,
463}
464
465impl MockRemoteKnowledgeProvider {
466 pub fn new(mock_kb: Arc<KnowledgeBase>) -> Self {
468 Self { mock_kb }
469 }
470}
471
472#[async_trait]
473impl RemoteKnowledgeProvider for MockRemoteKnowledgeProvider {
474 async fn query_predicate(
475 &self,
476 request: QueryRequest,
477 ) -> Result<QueryResponse, RemoteReasoningError> {
478 let predicates = self
479 .mock_kb
480 .get_predicates(&request.predicate_name)
481 .into_iter()
482 .take(request.max_results)
483 .cloned()
484 .collect();
485
486 let rules = self
487 .mock_kb
488 .get_rules(&request.predicate_name)
489 .into_iter()
490 .take(request.max_results)
491 .cloned()
492 .collect();
493
494 Ok(QueryResponse {
495 request_id: request.request_id,
496 predicates,
497 rules,
498 proof_fragments: Vec::new(),
499 peer_id: "mock_peer".to_string(),
500 has_more: false,
501 continuation_token: None,
502 })
503 }
504
505 async fn discover_facts(
506 &self,
507 request: FactDiscoveryRequest,
508 ) -> Result<FactDiscoveryResponse, RemoteReasoningError> {
509 let facts: Vec<Predicate> = self
510 .mock_kb
511 .get_predicates(&request.predicate_name)
512 .into_iter()
513 .cloned()
514 .collect();
515
516 let sources: HashMap<usize, String> = (0..facts.len())
517 .map(|i| (i, "mock_peer".to_string()))
518 .collect();
519
520 let hops: HashMap<usize, usize> = (0..facts.len()).map(|i| (i, 0)).collect();
521
522 Ok(FactDiscoveryResponse {
523 facts,
524 sources,
525 peers_queried: 1,
526 hops,
527 })
528 }
529
530 async fn load_incremental(
531 &self,
532 request: IncrementalLoadRequest,
533 ) -> Result<IncrementalLoadResponse, RemoteReasoningError> {
534 let all_facts: Vec<Predicate> = self
535 .mock_kb
536 .get_predicates(&request.predicate_name)
537 .into_iter()
538 .cloned()
539 .collect();
540
541 let total_count = all_facts.len();
542 let start = request.offset;
543 let end = (start + request.batch_size).min(total_count);
544
545 let batch = all_facts[start..end].to_vec();
546 let is_last = end >= total_count;
547 let next_offset = if is_last { None } else { Some(end) };
548
549 Ok(IncrementalLoadResponse {
550 batch,
551 total_count,
552 next_offset,
553 is_last,
554 })
555 }
556
557 async fn resolve_goal(
558 &self,
559 request: GoalResolutionRequest,
560 ) -> Result<GoalResolutionResponse, RemoteReasoningError> {
561 let facts = self.mock_kb.get_predicates(&request.goal.name);
563 let mut solutions = Vec::new();
564
565 for fact in facts {
566 if let Some(subst) =
567 crate::reasoning::unify_predicates(&request.goal, fact, &Substitution::new())
568 {
569 solutions.push(subst);
570 }
571 }
572
573 let solved = !solutions.is_empty();
574 let proof = if solved {
575 Some(Proof::fact(request.goal.clone()))
576 } else {
577 None
578 };
579
580 Ok(GoalResolutionResponse {
581 request_id: request.request_id,
582 solved,
583 solutions,
584 proof,
585 proof_fragments: Vec::new(),
586 })
587 }
588
589 async fn get_available_peers(&self) -> Result<Vec<String>, RemoteReasoningError> {
590 Ok(vec!["mock_peer".to_string()])
591 }
592}
593
594#[cfg(test)]
595mod tests {
596 use super::*;
597 use crate::ir::Constant;
598
599 #[tokio::test]
600 async fn test_query_request_serialization() {
601 let request = QueryRequest {
602 predicate_name: "parent".to_string(),
603 ground_args: vec!["Alice".to_string()],
604 max_results: 10,
605 max_depth: 5,
606 request_id: "test_123".to_string(),
607 };
608
609 let json = serde_json::to_string(&request).unwrap();
610 let decoded: QueryRequest = serde_json::from_str(&json).unwrap();
611
612 assert_eq!(request.predicate_name, decoded.predicate_name);
613 assert_eq!(request.ground_args, decoded.ground_args);
614 }
615
616 #[tokio::test]
617 async fn test_mock_provider_query() {
618 let mut kb = KnowledgeBase::new();
619 kb.add_fact(Predicate::new(
620 "parent".to_string(),
621 vec![
622 Term::Const(Constant::String("Alice".to_string())),
623 Term::Const(Constant::String("Bob".to_string())),
624 ],
625 ));
626
627 let provider = MockRemoteKnowledgeProvider::new(Arc::new(kb));
628
629 let request = QueryRequest {
630 predicate_name: "parent".to_string(),
631 ground_args: vec![],
632 max_results: 10,
633 max_depth: 5,
634 request_id: "test_123".to_string(),
635 };
636
637 let response = provider.query_predicate(request).await.unwrap();
638 assert_eq!(response.predicates.len(), 1);
639 assert_eq!(response.predicates[0].name, "parent");
640 }
641
642 #[tokio::test]
643 async fn test_distributed_resolver() {
644 let mut local_kb = KnowledgeBase::new();
645 local_kb.add_fact(Predicate::new(
646 "parent".to_string(),
647 vec![
648 Term::Const(Constant::String("Alice".to_string())),
649 Term::Const(Constant::String("Bob".to_string())),
650 ],
651 ));
652
653 let mut resolver = DistributedGoalResolver::new(Arc::new(local_kb));
654
655 let goal = Predicate::new(
656 "parent".to_string(),
657 vec![
658 Term::Const(Constant::String("Alice".to_string())),
659 Term::Var("X".to_string()),
660 ],
661 );
662
663 let solutions = resolver.resolve(&goal, &Substitution::new()).await.unwrap();
664 assert!(!solutions.is_empty());
665 }
666
667 #[tokio::test]
668 async fn test_fact_discovery() {
669 let mut kb = KnowledgeBase::new();
670 kb.add_fact(Predicate::new(
671 "parent".to_string(),
672 vec![
673 Term::Const(Constant::String("Alice".to_string())),
674 Term::Const(Constant::String("Bob".to_string())),
675 ],
676 ));
677 kb.add_fact(Predicate::new(
678 "parent".to_string(),
679 vec![
680 Term::Const(Constant::String("Bob".to_string())),
681 Term::Const(Constant::String("Charlie".to_string())),
682 ],
683 ));
684
685 let provider = MockRemoteKnowledgeProvider::new(Arc::new(kb));
686
687 let request = FactDiscoveryRequest {
688 predicate_name: "parent".to_string(),
689 arg_patterns: vec![],
690 max_hops: 3,
691 ttl: 30,
692 exclude_peers: HashSet::new(),
693 };
694
695 let response = provider.discover_facts(request).await.unwrap();
696 assert_eq!(response.facts.len(), 2);
697 assert_eq!(response.peers_queried, 1);
698 }
699
700 #[tokio::test]
701 async fn test_incremental_loading() {
702 let mut kb = KnowledgeBase::new();
703 for i in 0..10 {
704 kb.add_fact(Predicate::new(
705 "number".to_string(),
706 vec![Term::Const(Constant::Int(i))],
707 ));
708 }
709
710 let provider = MockRemoteKnowledgeProvider::new(Arc::new(kb));
711
712 let request = IncrementalLoadRequest {
714 predicate_name: "number".to_string(),
715 batch_size: 3,
716 offset: 0,
717 filter: None,
718 };
719
720 let response = provider.load_incremental(request).await.unwrap();
721 assert_eq!(response.batch.len(), 3);
722 assert_eq!(response.total_count, 10);
723 assert!(!response.is_last);
724 assert_eq!(response.next_offset, Some(3));
725 }
726
727 #[tokio::test]
728 async fn test_goal_resolution() {
729 let mut kb = KnowledgeBase::new();
730 kb.add_fact(Predicate::new(
731 "parent".to_string(),
732 vec![
733 Term::Const(Constant::String("Alice".to_string())),
734 Term::Const(Constant::String("Bob".to_string())),
735 ],
736 ));
737
738 let provider = MockRemoteKnowledgeProvider::new(Arc::new(kb));
739
740 let goal = Predicate::new(
741 "parent".to_string(),
742 vec![
743 Term::Const(Constant::String("Alice".to_string())),
744 Term::Var("X".to_string()),
745 ],
746 );
747
748 let request = GoalResolutionRequest {
749 goal,
750 substitution: HashMap::new(),
751 depth: 0,
752 requester: "test".to_string(),
753 request_id: "test_123".to_string(),
754 };
755
756 let response = provider.resolve_goal(request).await.unwrap();
757 assert!(response.solved);
758 assert!(!response.solutions.is_empty());
759 assert!(response.proof.is_some());
760 }
761}