1use crate::core::Result;
15use std::collections::HashMap;
16use std::future::Future;
17use std::pin::Pin;
18
19use async_trait::async_trait;
20
21pub type VectorMetadata = Option<HashMap<String, String>>;
23
24pub type VectorBatch = Vec<(String, Vec<f32>, VectorMetadata)>;
26
27pub trait Storage {
32 type Entity;
34 type Document;
36 type Chunk;
38 type Error: std::error::Error + Send + Sync + 'static;
40
41 fn store_entity(&mut self, entity: Self::Entity) -> Result<String>;
43
44 fn retrieve_entity(&self, id: &str) -> Result<Option<Self::Entity>>;
46
47 fn store_document(&mut self, document: Self::Document) -> Result<String>;
49
50 fn retrieve_document(&self, id: &str) -> Result<Option<Self::Document>>;
52
53 fn store_chunk(&mut self, chunk: Self::Chunk) -> Result<String>;
55
56 fn retrieve_chunk(&self, id: &str) -> Result<Option<Self::Chunk>>;
58
59 fn list_entities(&self) -> Result<Vec<String>>;
61
62 fn store_entities_batch(&mut self, entities: Vec<Self::Entity>) -> Result<Vec<String>>;
64}
65
66#[allow(async_fn_in_trait)]
71#[async_trait]
72pub trait AsyncStorage: Send + Sync {
73 type Entity: Send + Sync;
75 type Document: Send + Sync;
77 type Chunk: Send + Sync;
79 type Error: std::error::Error + Send + Sync + 'static;
81
82 async fn store_entity(&mut self, entity: Self::Entity) -> Result<String>;
84
85 async fn retrieve_entity(&self, id: &str) -> Result<Option<Self::Entity>>;
87
88 async fn store_document(&mut self, document: Self::Document) -> Result<String>;
90
91 async fn retrieve_document(&self, id: &str) -> Result<Option<Self::Document>>;
93
94 async fn store_chunk(&mut self, chunk: Self::Chunk) -> Result<String>;
96
97 async fn retrieve_chunk(&self, id: &str) -> Result<Option<Self::Chunk>>;
99
100 async fn list_entities(&self) -> Result<Vec<String>>;
102
103 async fn store_entities_batch(&mut self, entities: Vec<Self::Entity>) -> Result<Vec<String>>;
105
106 async fn health_check(&self) -> Result<bool> {
108 Ok(true)
109 }
110
111 async fn flush(&mut self) -> Result<()> {
113 Ok(())
114 }
115}
116
117pub trait Embedder {
122 type Error: std::error::Error + Send + Sync + 'static;
124
125 fn embed(&self, text: &str) -> Result<Vec<f32>>;
127
128 fn embed_batch(&self, texts: &[&str]) -> Result<Vec<Vec<f32>>>;
130
131 fn dimension(&self) -> usize;
133
134 fn is_ready(&self) -> bool;
136}
137
138#[allow(async_fn_in_trait)]
143#[async_trait]
144pub trait AsyncEmbedder: Send + Sync {
145 type Error: std::error::Error + Send + Sync + 'static;
147
148 async fn embed(&self, text: &str) -> Result<Vec<f32>>;
150
151 async fn embed_batch(&self, texts: &[&str]) -> Result<Vec<Vec<f32>>>;
153
154 async fn embed_batch_concurrent(
156 &self,
157 texts: &[&str],
158 max_concurrent: usize,
159 ) -> Result<Vec<Vec<f32>>> {
160 if max_concurrent <= 1 {
161 return self.embed_batch(texts).await;
162 }
163
164 let chunks: Vec<_> = texts.chunks(max_concurrent).collect();
165 let mut results = Vec::with_capacity(texts.len());
166
167 for chunk in chunks {
168 let batch_results = self.embed_batch(chunk).await?;
169 results.extend(batch_results);
170 }
171
172 Ok(results)
173 }
174
175 fn dimension(&self) -> usize;
177
178 async fn is_ready(&self) -> bool;
180
181 async fn health_check(&self) -> Result<bool> {
183 self.is_ready()
184 .await
185 .then_some(true)
186 .ok_or_else(|| crate::core::GraphRAGError::Retrieval {
187 message: "Embedding service health check failed".to_string(),
188 })
189 }
190}
191
192pub trait VectorStore {
197 type Error: std::error::Error + Send + Sync + 'static;
199
200 fn add_vector(&mut self, id: String, vector: Vec<f32>, metadata: VectorMetadata) -> Result<()>;
202
203 fn add_vectors_batch(&mut self, vectors: VectorBatch) -> Result<()>;
205
206 fn search(&self, query_vector: &[f32], k: usize) -> Result<Vec<SearchResult>>;
208
209 fn search_with_threshold(
211 &self,
212 query_vector: &[f32],
213 k: usize,
214 threshold: f32,
215 ) -> Result<Vec<SearchResult>>;
216
217 fn remove_vector(&mut self, id: &str) -> Result<bool>;
219
220 fn len(&self) -> usize;
222
223 fn is_empty(&self) -> bool;
225}
226
227#[allow(async_fn_in_trait)]
232#[async_trait]
233pub trait AsyncVectorStore: Send + Sync {
234 type Error: std::error::Error + Send + Sync + 'static;
236
237 async fn add_vector(
239 &mut self,
240 id: String,
241 vector: Vec<f32>,
242 metadata: VectorMetadata,
243 ) -> Result<()>;
244
245 async fn add_vectors_batch(&mut self, vectors: VectorBatch) -> Result<()>;
247
248 async fn add_vectors_batch_concurrent(
250 &mut self,
251 vectors: VectorBatch,
252 max_concurrent: usize,
253 ) -> Result<()> {
254 if max_concurrent <= 1 {
255 return self.add_vectors_batch(vectors).await;
256 }
257
258 for chunk in vectors.chunks(max_concurrent) {
259 self.add_vectors_batch(chunk.to_vec()).await?;
260 }
261
262 Ok(())
263 }
264
265 async fn search(&self, query_vector: &[f32], k: usize) -> Result<Vec<SearchResult>>;
267
268 async fn search_with_threshold(
270 &self,
271 query_vector: &[f32],
272 k: usize,
273 threshold: f32,
274 ) -> Result<Vec<SearchResult>>;
275
276 async fn search_batch(
278 &self,
279 query_vectors: &[Vec<f32>],
280 k: usize,
281 ) -> Result<Vec<Vec<SearchResult>>> {
282 let mut results = Vec::with_capacity(query_vectors.len());
283 for query in query_vectors {
284 let search_results = self.search(query, k).await?;
285 results.push(search_results);
286 }
287 Ok(results)
288 }
289
290 async fn remove_vector(&mut self, id: &str) -> Result<bool>;
292
293 async fn remove_vectors_batch(&mut self, ids: &[&str]) -> Result<Vec<bool>> {
295 let mut results = Vec::with_capacity(ids.len());
296 for id in ids {
297 let removed = self.remove_vector(id).await?;
298 results.push(removed);
299 }
300 Ok(results)
301 }
302
303 async fn len(&self) -> usize;
305
306 async fn is_empty(&self) -> bool {
308 self.len().await == 0
309 }
310
311 async fn health_check(&self) -> Result<bool> {
313 Ok(true)
314 }
315
316 async fn build_index(&mut self) -> Result<()> {
318 Ok(())
319 }
320}
321
322#[derive(Debug, Clone)]
324pub struct SearchResult {
325 pub id: String,
327 pub distance: f32,
329 pub metadata: Option<HashMap<String, String>>,
331}
332
333pub trait EntityExtractor {
338 type Entity;
340 type Error: std::error::Error + Send + Sync + 'static;
342
343 fn extract(&self, text: &str) -> Result<Vec<Self::Entity>>;
345
346 fn extract_with_confidence(&self, text: &str) -> Result<Vec<(Self::Entity, f32)>>;
348
349 fn set_confidence_threshold(&mut self, threshold: f32);
351}
352
353#[allow(async_fn_in_trait)]
358#[async_trait]
359pub trait AsyncEntityExtractor: Send + Sync {
360 type Entity: Send + Sync;
362 type Error: std::error::Error + Send + Sync + 'static;
364
365 async fn extract(&self, text: &str) -> Result<Vec<Self::Entity>>;
367
368 async fn extract_with_confidence(&self, text: &str) -> Result<Vec<(Self::Entity, f32)>>;
370
371 async fn extract_batch(&self, texts: &[&str]) -> Result<Vec<Vec<Self::Entity>>> {
373 let mut results = Vec::with_capacity(texts.len());
374 for text in texts {
375 let entities = self.extract(text).await?;
376 results.push(entities);
377 }
378 Ok(results)
379 }
380
381 async fn extract_batch_concurrent(
383 &self,
384 texts: &[&str],
385 max_concurrent: usize,
386 ) -> Result<Vec<Vec<Self::Entity>>> {
387 if max_concurrent <= 1 {
388 return self.extract_batch(texts).await;
389 }
390
391 let chunks: Vec<_> = texts.chunks(max_concurrent).collect();
392 let mut results = Vec::with_capacity(texts.len());
393
394 for chunk in chunks {
395 let batch_results = self.extract_batch(chunk).await?;
396 results.extend(batch_results);
397 }
398
399 Ok(results)
400 }
401
402 async fn set_confidence_threshold(&mut self, threshold: f32);
404
405 async fn get_confidence_threshold(&self) -> f32;
407
408 async fn health_check(&self) -> Result<bool> {
410 Ok(true)
411 }
412}
413
414pub trait Retriever {
419 type Query;
421 type Result;
423 type Error: std::error::Error + Send + Sync + 'static;
425
426 fn search(&self, query: Self::Query, k: usize) -> Result<Vec<Self::Result>>;
428
429 fn search_with_context(
431 &self,
432 query: Self::Query,
433 context: &str,
434 k: usize,
435 ) -> Result<Vec<Self::Result>>;
436
437 fn update(&mut self, content: Vec<String>) -> Result<()>;
439}
440
441#[allow(async_fn_in_trait)]
446#[async_trait]
447pub trait AsyncRetriever: Send + Sync {
448 type Query: Send + Sync;
450 type Result: Send + Sync;
452 type Error: std::error::Error + Send + Sync + 'static;
454
455 async fn search(&self, query: Self::Query, k: usize) -> Result<Vec<Self::Result>>;
457
458 async fn search_with_context(
460 &self,
461 query: Self::Query,
462 context: &str,
463 k: usize,
464 ) -> Result<Vec<Self::Result>>;
465
466 async fn search_batch(
468 &self,
469 queries: Vec<Self::Query>,
470 k: usize,
471 ) -> Result<Vec<Vec<Self::Result>>> {
472 let mut results = Vec::with_capacity(queries.len());
473 for query in queries {
474 let search_results = self.search(query, k).await?;
475 results.push(search_results);
476 }
477 Ok(results)
478 }
479
480 async fn update(&mut self, content: Vec<String>) -> Result<()>;
482
483 async fn update_batch(&mut self, content_batches: Vec<Vec<String>>) -> Result<()> {
485 for batch in content_batches {
486 self.update(batch).await?;
487 }
488 Ok(())
489 }
490
491 async fn refresh_index(&mut self) -> Result<()> {
493 Ok(())
494 }
495
496 async fn health_check(&self) -> Result<bool> {
498 Ok(true)
499 }
500
501 async fn get_stats(&self) -> Result<RetrievalStats> {
503 Ok(RetrievalStats::default())
504 }
505}
506
507#[derive(Debug, Clone, Default)]
509pub struct RetrievalStats {
510 pub total_queries: u64,
512 pub average_response_time_ms: f64,
514 pub index_size: usize,
516 pub cache_hit_rate: f64,
518}
519
520pub trait LanguageModel {
525 type Error: std::error::Error + Send + Sync + 'static;
527
528 fn complete(&self, prompt: &str) -> Result<String>;
530
531 fn complete_with_params(&self, prompt: &str, params: GenerationParams) -> Result<String>;
533
534 fn is_available(&self) -> bool;
536
537 fn model_info(&self) -> ModelInfo;
539}
540
541#[allow(async_fn_in_trait)]
546#[async_trait]
547pub trait AsyncLanguageModel: Send + Sync {
548 type Error: std::error::Error + Send + Sync + 'static;
550
551 async fn complete(&self, prompt: &str) -> Result<String>;
553
554 async fn complete_with_params(&self, prompt: &str, params: GenerationParams) -> Result<String>;
556
557 async fn complete_batch(&self, prompts: &[&str]) -> Result<Vec<String>> {
559 let mut results = Vec::with_capacity(prompts.len());
560 for prompt in prompts {
561 let completion = self.complete(prompt).await?;
562 results.push(completion);
563 }
564 Ok(results)
565 }
566
567 async fn complete_batch_concurrent(
569 &self,
570 prompts: &[&str],
571 max_concurrent: usize,
572 ) -> Result<Vec<String>> {
573 if max_concurrent <= 1 {
574 return self.complete_batch(prompts).await;
575 }
576
577 let chunks: Vec<_> = prompts.chunks(max_concurrent).collect();
578 let mut results = Vec::with_capacity(prompts.len());
579
580 for chunk in chunks {
581 let batch_results = self.complete_batch(chunk).await?;
582 results.extend(batch_results);
583 }
584
585 Ok(results)
586 }
587
588 async fn complete_streaming(
590 &self,
591 prompt: &str,
592 ) -> Result<Pin<Box<dyn futures::Stream<Item = Result<String>> + Send>>> {
593 let result = self.complete(prompt).await?;
595 let stream = futures::stream::once(async move { Ok(result) });
596 Ok(Box::pin(stream))
597 }
598
599 async fn is_available(&self) -> bool;
601
602 async fn model_info(&self) -> ModelInfo;
604
605 async fn health_check(&self) -> Result<bool> {
607 self.is_available().await.then_some(true).ok_or_else(|| {
608 crate::core::GraphRAGError::Generation {
609 message: "Language model health check failed".to_string(),
610 }
611 })
612 }
613
614 async fn get_usage_stats(&self) -> Result<ModelUsageStats> {
616 Ok(ModelUsageStats::default())
617 }
618
619 async fn estimate_tokens(&self, prompt: &str) -> Result<usize> {
621 Ok(prompt.len() / 4)
623 }
624}
625
626#[derive(Debug, Clone, Default)]
628pub struct ModelUsageStats {
629 pub total_requests: u64,
631 pub total_tokens_processed: u64,
633 pub average_response_time_ms: f64,
635 pub error_rate: f64,
637}
638
639#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
641pub struct GenerationParams {
642 pub max_tokens: Option<usize>,
644 pub temperature: Option<f32>,
646 pub top_p: Option<f32>,
648 pub stop_sequences: Option<Vec<String>>,
650}
651
652impl Default for GenerationParams {
653 fn default() -> Self {
654 Self {
655 max_tokens: Some(1000),
656 temperature: Some(0.7),
657 top_p: Some(0.9),
658 stop_sequences: None,
659 }
660 }
661}
662
663#[derive(Debug, Clone)]
665pub struct ModelInfo {
666 pub name: String,
668 pub version: Option<String>,
670 pub max_context_length: Option<usize>,
672 pub supports_streaming: bool,
674}
675
676pub trait GraphStore {
681 type Node;
683 type Edge;
685 type Error: std::error::Error + Send + Sync + 'static;
687
688 fn add_node(&mut self, node: Self::Node) -> Result<String>;
690
691 fn add_edge(&mut self, from_id: &str, to_id: &str, edge: Self::Edge) -> Result<String>;
693
694 fn find_nodes(&self, criteria: &str) -> Result<Vec<Self::Node>>;
696
697 fn get_neighbors(&self, node_id: &str) -> Result<Vec<Self::Node>>;
699
700 fn traverse(&self, start_id: &str, max_depth: usize) -> Result<Vec<Self::Node>>;
702
703 fn stats(&self) -> GraphStats;
705}
706
707#[allow(async_fn_in_trait)]
712#[async_trait]
713pub trait AsyncGraphStore: Send + Sync {
714 type Node: Send + Sync;
716 type Edge: Send + Sync;
718 type Error: std::error::Error + Send + Sync + 'static;
720
721 async fn add_node(&mut self, node: Self::Node) -> Result<String>;
723
724 async fn add_nodes_batch(&mut self, nodes: Vec<Self::Node>) -> Result<Vec<String>> {
726 let mut ids = Vec::with_capacity(nodes.len());
727 for node in nodes {
728 let id = self.add_node(node).await?;
729 ids.push(id);
730 }
731 Ok(ids)
732 }
733
734 async fn add_edge(&mut self, from_id: &str, to_id: &str, edge: Self::Edge) -> Result<String>;
736
737 async fn add_edges_batch(
739 &mut self,
740 edges: Vec<(String, String, Self::Edge)>,
741 ) -> Result<Vec<String>> {
742 let mut ids = Vec::with_capacity(edges.len());
743 for (from_id, to_id, edge) in edges {
744 let id = self.add_edge(&from_id, &to_id, edge).await?;
745 ids.push(id);
746 }
747 Ok(ids)
748 }
749
750 async fn find_nodes(&self, criteria: &str) -> Result<Vec<Self::Node>>;
752
753 async fn find_nodes_batch(&self, criteria_list: &[&str]) -> Result<Vec<Vec<Self::Node>>> {
755 let mut results = Vec::with_capacity(criteria_list.len());
756 for criteria in criteria_list {
757 let nodes = self.find_nodes(criteria).await?;
758 results.push(nodes);
759 }
760 Ok(results)
761 }
762
763 async fn get_neighbors(&self, node_id: &str) -> Result<Vec<Self::Node>>;
765
766 async fn get_neighbors_batch(&self, node_ids: &[&str]) -> Result<Vec<Vec<Self::Node>>> {
768 let mut results = Vec::with_capacity(node_ids.len());
769 for node_id in node_ids {
770 let neighbors = self.get_neighbors(node_id).await?;
771 results.push(neighbors);
772 }
773 Ok(results)
774 }
775
776 async fn traverse(&self, start_id: &str, max_depth: usize) -> Result<Vec<Self::Node>>;
778
779 async fn traverse_batch(
781 &self,
782 start_ids: &[&str],
783 max_depth: usize,
784 ) -> Result<Vec<Vec<Self::Node>>> {
785 let mut results = Vec::with_capacity(start_ids.len());
786 for start_id in start_ids {
787 let traversal = self.traverse(start_id, max_depth).await?;
788 results.push(traversal);
789 }
790 Ok(results)
791 }
792
793 async fn stats(&self) -> GraphStats;
795
796 async fn health_check(&self) -> Result<bool> {
798 Ok(true)
799 }
800
801 async fn optimize(&mut self) -> Result<()> {
803 Ok(())
804 }
805
806 async fn export(&self) -> Result<Vec<u8>> {
808 Ok(Vec::new())
809 }
810
811 #[allow(clippy::disallowed_names)]
813 async fn import(&mut self, data: &[u8]) -> Result<()> {
814 let _ = data; Ok(())
816 }
817}
818
819#[derive(Debug, Clone)]
821pub struct GraphStats {
822 pub node_count: usize,
824 pub edge_count: usize,
826 pub average_degree: f32,
828 pub max_depth: usize,
830}
831
832pub trait FunctionRegistry {
837 type Function;
839 type CallResult;
841 type Error: std::error::Error + Send + Sync + 'static;
843
844 fn register(&mut self, name: String, function: Self::Function) -> Result<()>;
846
847 fn call(&self, name: &str, args: &str) -> Result<Self::CallResult>;
849
850 fn list_functions(&self) -> Vec<String>;
852
853 fn has_function(&self, name: &str) -> bool;
855}
856
857#[allow(async_fn_in_trait)]
862#[async_trait]
863pub trait AsyncFunctionRegistry: Send + Sync {
864 type Function: Send + Sync;
866 type CallResult: Send + Sync;
868 type Error: std::error::Error + Send + Sync + 'static;
870
871 async fn register(&mut self, name: String, function: Self::Function) -> Result<()>;
873
874 async fn call(&self, name: &str, args: &str) -> Result<Self::CallResult>;
876
877 async fn call_batch(&self, calls: &[(&str, &str)]) -> Result<Vec<Self::CallResult>> {
879 let mut results = Vec::with_capacity(calls.len());
880 for (name, args) in calls {
881 let result = self.call(name, args).await?;
882 results.push(result);
883 }
884 Ok(results)
885 }
886
887 async fn list_functions(&self) -> Vec<String>;
889
890 async fn has_function(&self, name: &str) -> bool;
892
893 async fn get_function_info(&self, name: &str) -> Result<Option<FunctionInfo>>;
895
896 async fn health_check(&self) -> Result<bool> {
898 Ok(true)
899 }
900
901 async fn validate_args(&self, name: &str, args: &str) -> Result<bool> {
903 let _ = (name, args); Ok(true)
905 }
906}
907
908#[derive(Debug, Clone)]
910pub struct FunctionInfo {
911 pub name: String,
913 pub description: Option<String>,
915 pub parameters: Vec<ParameterInfo>,
917 pub return_type: Option<String>,
919}
920
921#[derive(Debug, Clone)]
923pub struct ParameterInfo {
924 pub name: String,
926 pub param_type: String,
928 pub description: Option<String>,
930 pub required: bool,
932}
933
934pub trait ConfigProvider {
939 type Config;
941 type Error: std::error::Error + Send + Sync + 'static;
943
944 fn load(&self) -> Result<Self::Config>;
946
947 fn save(&self, config: &Self::Config) -> Result<()>;
949
950 fn validate(&self, config: &Self::Config) -> Result<()>;
952
953 fn default_config(&self) -> Self::Config;
955}
956
957#[allow(async_fn_in_trait)]
962#[async_trait]
963pub trait AsyncConfigProvider: Send + Sync {
964 type Config: Send + Sync;
966 type Error: std::error::Error + Send + Sync + 'static;
968
969 async fn load(&self) -> Result<Self::Config>;
971
972 async fn save(&self, config: &Self::Config) -> Result<()>;
974
975 async fn validate(&self, config: &Self::Config) -> Result<()>;
977
978 async fn default_config(&self) -> Self::Config;
980
981 async fn watch_changes(
983 &self,
984 ) -> Result<Pin<Box<dyn futures::Stream<Item = Result<Self::Config>> + Send + 'static>>>
985 where
986 Self::Config: 'static,
987 {
988 let stream = futures::stream::empty::<Result<Self::Config>>();
990 Ok(Box::pin(stream))
991 }
992
993 async fn reload(&self) -> Result<Self::Config> {
995 self.load().await
996 }
997
998 async fn health_check(&self) -> Result<bool> {
1000 Ok(true)
1001 }
1002}
1003
1004pub trait MetricsCollector {
1009 fn counter(&self, name: &str, value: u64, tags: Option<&[(&str, &str)]>);
1011
1012 fn gauge(&self, name: &str, value: f64, tags: Option<&[(&str, &str)]>);
1014
1015 fn histogram(&self, name: &str, value: f64, tags: Option<&[(&str, &str)]>);
1017
1018 fn timer(&self, name: &str) -> Timer;
1020}
1021
1022#[allow(async_fn_in_trait)]
1027#[async_trait]
1028pub trait AsyncMetricsCollector: Send + Sync {
1029 async fn counter(&self, name: &str, value: u64, tags: Option<&[(&str, &str)]>);
1031
1032 async fn gauge(&self, name: &str, value: f64, tags: Option<&[(&str, &str)]>);
1034
1035 async fn histogram(&self, name: &str, value: f64, tags: Option<&[(&str, &str)]>);
1037
1038 async fn record_batch(&self, metrics: &[MetricRecord]) {
1040 for metric in metrics {
1041 match metric {
1042 MetricRecord::Counter { name, value, tags } => {
1043 let tags_refs: Option<Vec<(&str, &str)>> = tags
1044 .as_ref()
1045 .map(|t| t.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
1046 self.counter(name, *value, tags_refs.as_deref()).await;
1047 },
1048 MetricRecord::Gauge { name, value, tags } => {
1049 let tags_refs: Option<Vec<(&str, &str)>> = tags
1050 .as_ref()
1051 .map(|t| t.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
1052 self.gauge(name, *value, tags_refs.as_deref()).await;
1053 },
1054 MetricRecord::Histogram { name, value, tags } => {
1055 let tags_refs: Option<Vec<(&str, &str)>> = tags
1056 .as_ref()
1057 .map(|t| t.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect());
1058 self.histogram(name, *value, tags_refs.as_deref()).await;
1059 },
1060 }
1061 }
1062 }
1063
1064 async fn timer(&self, name: &str) -> AsyncTimer;
1066
1067 async fn health_check(&self) -> Result<bool> {
1069 Ok(true)
1070 }
1071
1072 async fn flush(&self) -> Result<()> {
1074 Ok(())
1075 }
1076}
1077
1078#[derive(Debug, Clone)]
1080pub enum MetricRecord {
1081 Counter {
1083 name: String,
1085 value: u64,
1087 tags: Option<Vec<(String, String)>>,
1089 },
1090 Gauge {
1092 name: String,
1094 value: f64,
1096 tags: Option<Vec<(String, String)>>,
1098 },
1099 Histogram {
1101 name: String,
1103 value: f64,
1105 tags: Option<Vec<(String, String)>>,
1107 },
1108}
1109
1110pub struct AsyncTimer {
1112 name: String,
1114 start: std::time::Instant,
1116}
1117
1118impl AsyncTimer {
1119 pub fn new(name: String) -> Self {
1121 Self {
1122 name,
1123 start: std::time::Instant::now(),
1124 }
1125 }
1126
1127 pub async fn finish(self) -> std::time::Duration {
1129 self.start.elapsed()
1130 }
1131
1132 pub fn name(&self) -> &str {
1134 &self.name
1135 }
1136}
1137
1138pub struct Timer {
1140 #[allow(dead_code)]
1142 name: String,
1143 start: std::time::Instant,
1145}
1146
1147impl Timer {
1148 pub fn new(name: String) -> Self {
1150 Self {
1151 name,
1152 start: std::time::Instant::now(),
1153 }
1154 }
1155
1156 pub fn finish(self) -> std::time::Duration {
1158 self.start.elapsed()
1159 }
1160}
1161
1162pub trait Serializer {
1167 type Error: std::error::Error + Send + Sync + 'static;
1169
1170 fn serialize<T: serde::Serialize>(&self, data: &T) -> Result<String>;
1172
1173 fn deserialize<T: serde::de::DeserializeOwned>(&self, data: &str) -> Result<T>;
1175
1176 fn extension(&self) -> &'static str;
1178}
1179
1180#[allow(async_fn_in_trait)]
1185#[async_trait]
1186pub trait AsyncSerializer: Send + Sync {
1187 type Error: std::error::Error + Send + Sync + 'static;
1189
1190 async fn serialize<T: serde::Serialize + Send + Sync>(&self, data: &T) -> Result<String>;
1192
1193 async fn deserialize<T: serde::de::DeserializeOwned + Send + Sync>(
1195 &self,
1196 data: &str,
1197 ) -> Result<T>;
1198
1199 #[allow(clippy::disallowed_names)]
1201 async fn serialize_bytes<T: serde::Serialize + Send + Sync>(
1202 &self,
1203 data: &T,
1204 ) -> Result<Vec<u8>> {
1205 let string = self.serialize(data).await?;
1206 Ok(string.into_bytes())
1207 }
1208
1209 #[allow(clippy::disallowed_names)]
1211 async fn deserialize_bytes<T: serde::de::DeserializeOwned + Send + Sync>(
1212 &self,
1213 data: &[u8],
1214 ) -> Result<T> {
1215 let string = String::from_utf8(data.to_vec()).map_err(|e| {
1216 crate::core::GraphRAGError::Serialization {
1217 message: format!("Invalid UTF-8 data: {e}"),
1218 }
1219 })?;
1220 self.deserialize(&string).await
1221 }
1222
1223 #[allow(clippy::disallowed_names)]
1225 async fn serialize_batch<T: serde::Serialize + Send + Sync>(
1226 &self,
1227 data: &[T],
1228 ) -> Result<Vec<String>> {
1229 let mut results = Vec::with_capacity(data.len());
1230 for item in data {
1231 let serialized = self.serialize(item).await?;
1232 results.push(serialized);
1233 }
1234 Ok(results)
1235 }
1236
1237 fn extension(&self) -> &'static str;
1239
1240 async fn health_check(&self) -> Result<bool> {
1242 Ok(true)
1243 }
1244}
1245
1246pub mod sync_to_async {
1252 use super::*;
1253 use std::sync::Arc;
1254
1255 pub struct StorageAdapter<T>(pub Arc<tokio::sync::Mutex<T>>);
1257
1258 #[async_trait]
1259 impl<T> AsyncStorage for StorageAdapter<T>
1260 where
1261 T: Storage + Send + Sync + 'static,
1262 T::Entity: Send + Sync,
1263 T::Document: Send + Sync,
1264 T::Chunk: Send + Sync,
1265 {
1266 type Entity = T::Entity;
1268 type Document = T::Document;
1270 type Chunk = T::Chunk;
1272 type Error = T::Error;
1274
1275 async fn store_entity(&mut self, entity: Self::Entity) -> Result<String> {
1276 let mut storage = self.0.lock().await;
1277 storage.store_entity(entity)
1278 }
1279
1280 async fn retrieve_entity(&self, id: &str) -> Result<Option<Self::Entity>> {
1281 let storage = self.0.lock().await;
1282 storage.retrieve_entity(id)
1283 }
1284
1285 async fn store_document(&mut self, document: Self::Document) -> Result<String> {
1286 let mut storage = self.0.lock().await;
1287 storage.store_document(document)
1288 }
1289
1290 async fn retrieve_document(&self, id: &str) -> Result<Option<Self::Document>> {
1291 let storage = self.0.lock().await;
1292 storage.retrieve_document(id)
1293 }
1294
1295 async fn store_chunk(&mut self, chunk: Self::Chunk) -> Result<String> {
1296 let mut storage = self.0.lock().await;
1297 storage.store_chunk(chunk)
1298 }
1299
1300 async fn retrieve_chunk(&self, id: &str) -> Result<Option<Self::Chunk>> {
1301 let storage = self.0.lock().await;
1302 storage.retrieve_chunk(id)
1303 }
1304
1305 async fn list_entities(&self) -> Result<Vec<String>> {
1306 let storage = self.0.lock().await;
1307 storage.list_entities()
1308 }
1309
1310 async fn store_entities_batch(
1311 &mut self,
1312 entities: Vec<Self::Entity>,
1313 ) -> Result<Vec<String>> {
1314 let mut storage = self.0.lock().await;
1315 storage.store_entities_batch(entities)
1316 }
1317 }
1318
1319 pub struct LanguageModelAdapter<T>(pub Arc<T>);
1321
1322 #[async_trait]
1323 impl<T> AsyncLanguageModel for LanguageModelAdapter<T>
1324 where
1325 T: LanguageModel + Send + Sync + 'static,
1326 {
1327 type Error = T::Error;
1329
1330 async fn complete(&self, prompt: &str) -> Result<String> {
1331 self.0.complete(prompt)
1332 }
1333
1334 async fn complete_with_params(
1335 &self,
1336 prompt: &str,
1337 params: GenerationParams,
1338 ) -> Result<String> {
1339 self.0.complete_with_params(prompt, params)
1340 }
1341
1342 async fn is_available(&self) -> bool {
1343 self.0.is_available()
1344 }
1345
1346 async fn model_info(&self) -> ModelInfo {
1347 self.0.model_info()
1348 }
1349 }
1350}
1351
1352pub mod async_utils {
1354 use super::*;
1355 use std::time::Duration;
1356
1357 pub async fn with_timeout<F, T>(future: F, timeout: Duration) -> Result<T>
1359 where
1360 F: Future<Output = Result<T>>,
1361 {
1362 match tokio::time::timeout(timeout, future).await {
1363 Ok(result) => result,
1364 Err(_) => Err(crate::core::GraphRAGError::Timeout {
1365 operation: "async operation".to_string(),
1366 duration: timeout,
1367 }),
1368 }
1369 }
1370
1371 pub async fn with_retry<F, T, E>(
1373 mut operation: F,
1374 max_retries: usize,
1375 delay: Duration,
1376 ) -> std::result::Result<T, E>
1377 where
1378 F: FnMut() -> Pin<Box<dyn Future<Output = std::result::Result<T, E>> + Send>>,
1379 E: std::fmt::Debug,
1380 {
1381 let mut attempts = 0;
1382 loop {
1383 match operation().await {
1384 Ok(result) => return Ok(result),
1385 Err(err) => {
1386 attempts += 1;
1387 if attempts >= max_retries {
1388 return Err(err);
1389 }
1390 tokio::time::sleep(delay).await;
1391 },
1392 }
1393 }
1394 }
1395
1396 pub async fn process_batch_with_rate_limit<T, F, R>(
1398 items: Vec<T>,
1399 processor: F,
1400 max_concurrent: usize,
1401 rate_limit: Option<Duration>,
1402 ) -> Vec<Result<R>>
1403 where
1404 T: Send + 'static,
1405 F: Fn(T) -> Pin<Box<dyn Future<Output = Result<R>> + Send>> + Send + Sync + 'static,
1406 R: Send + 'static,
1407 {
1408 use futures::stream::{FuturesUnordered, StreamExt};
1409 use std::sync::Arc;
1410
1411 let processor = Arc::new(processor);
1412 let mut futures = FuturesUnordered::new();
1413 let mut results = Vec::with_capacity(items.len());
1414 let mut pending = 0;
1415
1416 for item in items {
1417 if pending >= max_concurrent {
1418 if let Some(result) = futures.next().await {
1419 results.push(result);
1420 pending -= 1;
1421 }
1422 }
1423
1424 let processor_clone = Arc::clone(&processor);
1425 futures.push(async move {
1426 if let Some(delay) = rate_limit {
1427 tokio::time::sleep(delay).await;
1428 }
1429 processor_clone(item).await
1430 });
1431 pending += 1;
1432 }
1433
1434 while let Some(result) = futures.next().await {
1435 results.push(result);
1436 }
1437
1438 results
1439 }
1440}
1441
1442pub type BoxedAsyncLanguageModel =
1445 Box<dyn AsyncLanguageModel<Error = crate::core::GraphRAGError> + Send + Sync>;
1446pub type BoxedAsyncEmbedder =
1448 Box<dyn AsyncEmbedder<Error = crate::core::GraphRAGError> + Send + Sync>;
1449pub type BoxedAsyncVectorStore =
1451 Box<dyn AsyncVectorStore<Error = crate::core::GraphRAGError> + Send + Sync>;
1452pub type BoxedAsyncRetriever = Box<
1454 dyn AsyncRetriever<
1455 Query = String,
1456 Result = crate::retrieval::SearchResult,
1457 Error = crate::core::GraphRAGError,
1458 > + Send
1459 + Sync,
1460>;