1use crate::core::Result;
15use std::collections::HashMap;
16use std::future::Future;
17use std::pin::Pin;
18
19use async_trait::async_trait;
20
21pub type VectorMetadata = Option<HashMap<String, String>>;
23
24pub type VectorBatch = Vec<(String, Vec<f32>, VectorMetadata)>;
26
27pub trait Storage {
32 type Entity;
34 type Document;
36 type Chunk;
38 type Error: std::error::Error + Send + Sync + 'static;
40
41 fn store_entity(&mut self, entity: Self::Entity) -> Result<String>;
43
44 fn retrieve_entity(&self, id: &str) -> Result<Option<Self::Entity>>;
46
47 fn store_document(&mut self, document: Self::Document) -> Result<String>;
49
50 fn retrieve_document(&self, id: &str) -> Result<Option<Self::Document>>;
52
53 fn store_chunk(&mut self, chunk: Self::Chunk) -> Result<String>;
55
56 fn retrieve_chunk(&self, id: &str) -> Result<Option<Self::Chunk>>;
58
59 fn list_entities(&self) -> Result<Vec<String>>;
61
62 fn store_entities_batch(&mut self, entities: Vec<Self::Entity>) -> Result<Vec<String>>;
64}
65
66#[allow(async_fn_in_trait)]
71#[async_trait]
72pub trait AsyncStorage: Send + Sync {
73 type Entity: Send + Sync;
75 type Document: Send + Sync;
77 type Chunk: Send + Sync;
79 type Error: std::error::Error + Send + Sync + 'static;
81
82 async fn store_entity(&mut self, entity: Self::Entity) -> Result<String>;
84
85 async fn retrieve_entity(&self, id: &str) -> Result<Option<Self::Entity>>;
87
88 async fn store_document(&mut self, document: Self::Document) -> Result<String>;
90
91 async fn retrieve_document(&self, id: &str) -> Result<Option<Self::Document>>;
93
94 async fn store_chunk(&mut self, chunk: Self::Chunk) -> Result<String>;
96
97 async fn retrieve_chunk(&self, id: &str) -> Result<Option<Self::Chunk>>;
99
100 async fn list_entities(&self) -> Result<Vec<String>>;
102
103 async fn store_entities_batch(&mut self, entities: Vec<Self::Entity>) -> Result<Vec<String>>;
105
106 async fn health_check(&self) -> Result<bool> {
108 Ok(true)
109 }
110
111 async fn flush(&mut self) -> Result<()> {
113 Ok(())
114 }
115}
116
117pub trait Embedder {
122 type Error: std::error::Error + Send + Sync + 'static;
124
125 fn embed(&self, text: &str) -> Result<Vec<f32>>;
127
128 fn embed_batch(&self, texts: &[&str]) -> Result<Vec<Vec<f32>>>;
130
131 fn dimension(&self) -> usize;
133
134 fn is_ready(&self) -> bool;
136}
137
138#[allow(async_fn_in_trait)]
143#[async_trait]
144pub trait AsyncEmbedder: Send + Sync {
145 type Error: std::error::Error + Send + Sync + 'static;
147
148 async fn embed(&self, text: &str) -> Result<Vec<f32>>;
150
151 async fn embed_batch(&self, texts: &[&str]) -> Result<Vec<Vec<f32>>>;
153
154 async fn embed_batch_concurrent(&self, texts: &[&str], max_concurrent: usize) -> Result<Vec<Vec<f32>>> {
156 if max_concurrent <= 1 {
157 return self.embed_batch(texts).await;
158 }
159
160 let chunks: Vec<_> = texts.chunks(max_concurrent).collect();
161 let mut results = Vec::with_capacity(texts.len());
162
163 for chunk in chunks {
164 let batch_results = self.embed_batch(chunk).await?;
165 results.extend(batch_results);
166 }
167
168 Ok(results)
169 }
170
171 fn dimension(&self) -> usize;
173
174 async fn is_ready(&self) -> bool;
176
177 async fn health_check(&self) -> Result<bool> {
179 self.is_ready().await.then_some(true).ok_or_else(|| {
180 crate::core::GraphRAGError::Retrieval {
181 message: "Embedding service health check failed".to_string(),
182 }
183 })
184 }
185}
186
187pub trait VectorStore {
192 type Error: std::error::Error + Send + Sync + 'static;
194
195 fn add_vector(&mut self, id: String, vector: Vec<f32>, metadata: VectorMetadata) -> Result<()>;
197
198 fn add_vectors_batch(&mut self, vectors: VectorBatch) -> Result<()>;
200
201 fn search(&self, query_vector: &[f32], k: usize) -> Result<Vec<SearchResult>>;
203
204 fn search_with_threshold(
206 &self,
207 query_vector: &[f32],
208 k: usize,
209 threshold: f32,
210 ) -> Result<Vec<SearchResult>>;
211
212 fn remove_vector(&mut self, id: &str) -> Result<bool>;
214
215 fn len(&self) -> usize;
217
218 fn is_empty(&self) -> bool;
220}
221
222#[allow(async_fn_in_trait)]
227#[async_trait]
228pub trait AsyncVectorStore: Send + Sync {
229 type Error: std::error::Error + Send + Sync + 'static;
231
232 async fn add_vector(&mut self, id: String, vector: Vec<f32>, metadata: VectorMetadata) -> Result<()>;
234
235 async fn add_vectors_batch(&mut self, vectors: VectorBatch) -> Result<()>;
237
238 async fn add_vectors_batch_concurrent(&mut self, vectors: VectorBatch, max_concurrent: usize) -> Result<()> {
240 if max_concurrent <= 1 {
241 return self.add_vectors_batch(vectors).await;
242 }
243
244 for chunk in vectors.chunks(max_concurrent) {
245 self.add_vectors_batch(chunk.to_vec()).await?;
246 }
247
248 Ok(())
249 }
250
251 async fn search(&self, query_vector: &[f32], k: usize) -> Result<Vec<SearchResult>>;
253
254 async fn search_with_threshold(
256 &self,
257 query_vector: &[f32],
258 k: usize,
259 threshold: f32,
260 ) -> Result<Vec<SearchResult>>;
261
262 async fn search_batch(&self, query_vectors: &[Vec<f32>], k: usize) -> Result<Vec<Vec<SearchResult>>> {
264 let mut results = Vec::with_capacity(query_vectors.len());
265 for query in query_vectors {
266 let search_results = self.search(query, k).await?;
267 results.push(search_results);
268 }
269 Ok(results)
270 }
271
272 async fn remove_vector(&mut self, id: &str) -> Result<bool>;
274
275 async fn remove_vectors_batch(&mut self, ids: &[&str]) -> Result<Vec<bool>> {
277 let mut results = Vec::with_capacity(ids.len());
278 for id in ids {
279 let removed = self.remove_vector(id).await?;
280 results.push(removed);
281 }
282 Ok(results)
283 }
284
285 async fn len(&self) -> usize;
287
288 async fn is_empty(&self) -> bool {
290 self.len().await == 0
291 }
292
293 async fn health_check(&self) -> Result<bool> {
295 Ok(true)
296 }
297
298 async fn build_index(&mut self) -> Result<()> {
300 Ok(())
301 }
302}
303
304#[derive(Debug, Clone)]
306pub struct SearchResult {
307 pub id: String,
309 pub distance: f32,
311 pub metadata: Option<HashMap<String, String>>,
313}
314
315pub trait EntityExtractor {
320 type Entity;
322 type Error: std::error::Error + Send + Sync + 'static;
324
325 fn extract(&self, text: &str) -> Result<Vec<Self::Entity>>;
327
328 fn extract_with_confidence(&self, text: &str) -> Result<Vec<(Self::Entity, f32)>>;
330
331 fn set_confidence_threshold(&mut self, threshold: f32);
333}
334
335#[allow(async_fn_in_trait)]
340#[async_trait]
341pub trait AsyncEntityExtractor: Send + Sync {
342 type Entity: Send + Sync;
344 type Error: std::error::Error + Send + Sync + 'static;
346
347 async fn extract(&self, text: &str) -> Result<Vec<Self::Entity>>;
349
350 async fn extract_with_confidence(&self, text: &str) -> Result<Vec<(Self::Entity, f32)>>;
352
353 async fn extract_batch(&self, texts: &[&str]) -> Result<Vec<Vec<Self::Entity>>> {
355 let mut results = Vec::with_capacity(texts.len());
356 for text in texts {
357 let entities = self.extract(text).await?;
358 results.push(entities);
359 }
360 Ok(results)
361 }
362
363 async fn extract_batch_concurrent(&self, texts: &[&str], max_concurrent: usize) -> Result<Vec<Vec<Self::Entity>>> {
365 if max_concurrent <= 1 {
366 return self.extract_batch(texts).await;
367 }
368
369 let chunks: Vec<_> = texts.chunks(max_concurrent).collect();
370 let mut results = Vec::with_capacity(texts.len());
371
372 for chunk in chunks {
373 let batch_results = self.extract_batch(chunk).await?;
374 results.extend(batch_results);
375 }
376
377 Ok(results)
378 }
379
380 async fn set_confidence_threshold(&mut self, threshold: f32);
382
383 async fn get_confidence_threshold(&self) -> f32;
385
386 async fn health_check(&self) -> Result<bool> {
388 Ok(true)
389 }
390}
391
392pub trait Retriever {
397 type Query;
399 type Result;
401 type Error: std::error::Error + Send + Sync + 'static;
403
404 fn search(&self, query: Self::Query, k: usize) -> Result<Vec<Self::Result>>;
406
407 fn search_with_context(
409 &self,
410 query: Self::Query,
411 context: &str,
412 k: usize,
413 ) -> Result<Vec<Self::Result>>;
414
415 fn update(&mut self, content: Vec<String>) -> Result<()>;
417}
418
419#[allow(async_fn_in_trait)]
424#[async_trait]
425pub trait AsyncRetriever: Send + Sync {
426 type Query: Send + Sync;
428 type Result: Send + Sync;
430 type Error: std::error::Error + Send + Sync + 'static;
432
433 async fn search(&self, query: Self::Query, k: usize) -> Result<Vec<Self::Result>>;
435
436 async fn search_with_context(
438 &self,
439 query: Self::Query,
440 context: &str,
441 k: usize,
442 ) -> Result<Vec<Self::Result>>;
443
444 async fn search_batch(&self, queries: Vec<Self::Query>, k: usize) -> Result<Vec<Vec<Self::Result>>> {
446 let mut results = Vec::with_capacity(queries.len());
447 for query in queries {
448 let search_results = self.search(query, k).await?;
449 results.push(search_results);
450 }
451 Ok(results)
452 }
453
454 async fn update(&mut self, content: Vec<String>) -> Result<()>;
456
457 async fn update_batch(&mut self, content_batches: Vec<Vec<String>>) -> Result<()> {
459 for batch in content_batches {
460 self.update(batch).await?;
461 }
462 Ok(())
463 }
464
465 async fn refresh_index(&mut self) -> Result<()> {
467 Ok(())
468 }
469
470 async fn health_check(&self) -> Result<bool> {
472 Ok(true)
473 }
474
475 async fn get_stats(&self) -> Result<RetrievalStats> {
477 Ok(RetrievalStats::default())
478 }
479}
480
481#[derive(Debug, Clone, Default)]
483pub struct RetrievalStats {
484 pub total_queries: u64,
486 pub average_response_time_ms: f64,
488 pub index_size: usize,
490 pub cache_hit_rate: f64,
492}
493
494pub trait LanguageModel {
499 type Error: std::error::Error + Send + Sync + 'static;
501
502 fn complete(&self, prompt: &str) -> Result<String>;
504
505 fn complete_with_params(&self, prompt: &str, params: GenerationParams) -> Result<String>;
507
508 fn is_available(&self) -> bool;
510
511 fn model_info(&self) -> ModelInfo;
513}
514
515#[allow(async_fn_in_trait)]
520#[async_trait]
521pub trait AsyncLanguageModel: Send + Sync {
522 type Error: std::error::Error + Send + Sync + 'static;
524
525 async fn complete(&self, prompt: &str) -> Result<String>;
527
528 async fn complete_with_params(&self, prompt: &str, params: GenerationParams) -> Result<String>;
530
531 async fn complete_batch(&self, prompts: &[&str]) -> Result<Vec<String>> {
533 let mut results = Vec::with_capacity(prompts.len());
534 for prompt in prompts {
535 let completion = self.complete(prompt).await?;
536 results.push(completion);
537 }
538 Ok(results)
539 }
540
541 async fn complete_batch_concurrent(&self, prompts: &[&str], max_concurrent: usize) -> Result<Vec<String>> {
543 if max_concurrent <= 1 {
544 return self.complete_batch(prompts).await;
545 }
546
547 let chunks: Vec<_> = prompts.chunks(max_concurrent).collect();
548 let mut results = Vec::with_capacity(prompts.len());
549
550 for chunk in chunks {
551 let batch_results = self.complete_batch(chunk).await?;
552 results.extend(batch_results);
553 }
554
555 Ok(results)
556 }
557
558 async fn complete_streaming(&self, prompt: &str) -> Result<Pin<Box<dyn futures::Stream<Item = Result<String>> + Send>>> {
560 let result = self.complete(prompt).await?;
562 let stream = futures::stream::once(async move { Ok(result) });
563 Ok(Box::pin(stream))
564 }
565
566 async fn is_available(&self) -> bool;
568
569 async fn model_info(&self) -> ModelInfo;
571
572 async fn health_check(&self) -> Result<bool> {
574 self.is_available().await.then_some(true).ok_or_else(|| {
575 crate::core::GraphRAGError::Generation {
576 message: "Language model health check failed".to_string(),
577 }
578 })
579 }
580
581 async fn get_usage_stats(&self) -> Result<ModelUsageStats> {
583 Ok(ModelUsageStats::default())
584 }
585
586 async fn estimate_tokens(&self, prompt: &str) -> Result<usize> {
588 Ok(prompt.len() / 4)
590 }
591}
592
593#[derive(Debug, Clone, Default)]
595pub struct ModelUsageStats {
596 pub total_requests: u64,
598 pub total_tokens_processed: u64,
600 pub average_response_time_ms: f64,
602 pub error_rate: f64,
604}
605
606#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
608pub struct GenerationParams {
609 pub max_tokens: Option<usize>,
611 pub temperature: Option<f32>,
613 pub top_p: Option<f32>,
615 pub stop_sequences: Option<Vec<String>>,
617}
618
619impl Default for GenerationParams {
620 fn default() -> Self {
621 Self {
622 max_tokens: Some(1000),
623 temperature: Some(0.7),
624 top_p: Some(0.9),
625 stop_sequences: None,
626 }
627 }
628}
629
630#[derive(Debug, Clone)]
632pub struct ModelInfo {
633 pub name: String,
635 pub version: Option<String>,
637 pub max_context_length: Option<usize>,
639 pub supports_streaming: bool,
641}
642
643pub trait GraphStore {
648 type Node;
650 type Edge;
652 type Error: std::error::Error + Send + Sync + 'static;
654
655 fn add_node(&mut self, node: Self::Node) -> Result<String>;
657
658 fn add_edge(&mut self, from_id: &str, to_id: &str, edge: Self::Edge) -> Result<String>;
660
661 fn find_nodes(&self, criteria: &str) -> Result<Vec<Self::Node>>;
663
664 fn get_neighbors(&self, node_id: &str) -> Result<Vec<Self::Node>>;
666
667 fn traverse(&self, start_id: &str, max_depth: usize) -> Result<Vec<Self::Node>>;
669
670 fn stats(&self) -> GraphStats;
672}
673
674#[allow(async_fn_in_trait)]
679#[async_trait]
680pub trait AsyncGraphStore: Send + Sync {
681 type Node: Send + Sync;
683 type Edge: Send + Sync;
685 type Error: std::error::Error + Send + Sync + 'static;
687
688 async fn add_node(&mut self, node: Self::Node) -> Result<String>;
690
691 async fn add_nodes_batch(&mut self, nodes: Vec<Self::Node>) -> Result<Vec<String>> {
693 let mut ids = Vec::with_capacity(nodes.len());
694 for node in nodes {
695 let id = self.add_node(node).await?;
696 ids.push(id);
697 }
698 Ok(ids)
699 }
700
701 async fn add_edge(&mut self, from_id: &str, to_id: &str, edge: Self::Edge) -> Result<String>;
703
704 async fn add_edges_batch(&mut self, edges: Vec<(String, String, Self::Edge)>) -> Result<Vec<String>> {
706 let mut ids = Vec::with_capacity(edges.len());
707 for (from_id, to_id, edge) in edges {
708 let id = self.add_edge(&from_id, &to_id, edge).await?;
709 ids.push(id);
710 }
711 Ok(ids)
712 }
713
714 async fn find_nodes(&self, criteria: &str) -> Result<Vec<Self::Node>>;
716
717 async fn find_nodes_batch(&self, criteria_list: &[&str]) -> Result<Vec<Vec<Self::Node>>> {
719 let mut results = Vec::with_capacity(criteria_list.len());
720 for criteria in criteria_list {
721 let nodes = self.find_nodes(criteria).await?;
722 results.push(nodes);
723 }
724 Ok(results)
725 }
726
727 async fn get_neighbors(&self, node_id: &str) -> Result<Vec<Self::Node>>;
729
730 async fn get_neighbors_batch(&self, node_ids: &[&str]) -> Result<Vec<Vec<Self::Node>>> {
732 let mut results = Vec::with_capacity(node_ids.len());
733 for node_id in node_ids {
734 let neighbors = self.get_neighbors(node_id).await?;
735 results.push(neighbors);
736 }
737 Ok(results)
738 }
739
740 async fn traverse(&self, start_id: &str, max_depth: usize) -> Result<Vec<Self::Node>>;
742
743 async fn traverse_batch(&self, start_ids: &[&str], max_depth: usize) -> Result<Vec<Vec<Self::Node>>> {
745 let mut results = Vec::with_capacity(start_ids.len());
746 for start_id in start_ids {
747 let traversal = self.traverse(start_id, max_depth).await?;
748 results.push(traversal);
749 }
750 Ok(results)
751 }
752
753 async fn stats(&self) -> GraphStats;
755
756 async fn health_check(&self) -> Result<bool> {
758 Ok(true)
759 }
760
761 async fn optimize(&mut self) -> Result<()> {
763 Ok(())
764 }
765
766 async fn export(&self) -> Result<Vec<u8>> {
768 Ok(Vec::new())
769 }
770
771 #[allow(clippy::disallowed_names)]
773 async fn import(&mut self, data: &[u8]) -> Result<()> {
774 let _ = data; Ok(())
776 }
777}
778
779#[derive(Debug, Clone)]
781pub struct GraphStats {
782 pub node_count: usize,
784 pub edge_count: usize,
786 pub average_degree: f32,
788 pub max_depth: usize,
790}
791
792pub trait FunctionRegistry {
797 type Function;
799 type CallResult;
801 type Error: std::error::Error + Send + Sync + 'static;
803
804 fn register(&mut self, name: String, function: Self::Function) -> Result<()>;
806
807 fn call(&self, name: &str, args: &str) -> Result<Self::CallResult>;
809
810 fn list_functions(&self) -> Vec<String>;
812
813 fn has_function(&self, name: &str) -> bool;
815}
816
817#[allow(async_fn_in_trait)]
822#[async_trait]
823pub trait AsyncFunctionRegistry: Send + Sync {
824 type Function: Send + Sync;
826 type CallResult: Send + Sync;
828 type Error: std::error::Error + Send + Sync + 'static;
830
831 async fn register(&mut self, name: String, function: Self::Function) -> Result<()>;
833
834 async fn call(&self, name: &str, args: &str) -> Result<Self::CallResult>;
836
837 async fn call_batch(&self, calls: &[(&str, &str)]) -> Result<Vec<Self::CallResult>> {
839 let mut results = Vec::with_capacity(calls.len());
840 for (name, args) in calls {
841 let result = self.call(name, args).await?;
842 results.push(result);
843 }
844 Ok(results)
845 }
846
847 async fn list_functions(&self) -> Vec<String>;
849
850 async fn has_function(&self, name: &str) -> bool;
852
853 async fn get_function_info(&self, name: &str) -> Result<Option<FunctionInfo>>;
855
856 async fn health_check(&self) -> Result<bool> {
858 Ok(true)
859 }
860
861 async fn validate_args(&self, name: &str, args: &str) -> Result<bool> {
863 let _ = (name, args); Ok(true)
865 }
866}
867
868#[derive(Debug, Clone)]
870pub struct FunctionInfo {
871 pub name: String,
873 pub description: Option<String>,
875 pub parameters: Vec<ParameterInfo>,
877 pub return_type: Option<String>,
879}
880
881#[derive(Debug, Clone)]
883pub struct ParameterInfo {
884 pub name: String,
886 pub param_type: String,
888 pub description: Option<String>,
890 pub required: bool,
892}
893
894pub trait ConfigProvider {
899 type Config;
901 type Error: std::error::Error + Send + Sync + 'static;
903
904 fn load(&self) -> Result<Self::Config>;
906
907 fn save(&self, config: &Self::Config) -> Result<()>;
909
910 fn validate(&self, config: &Self::Config) -> Result<()>;
912
913 fn default_config(&self) -> Self::Config;
915}
916
917#[allow(async_fn_in_trait)]
922#[async_trait]
923pub trait AsyncConfigProvider: Send + Sync {
924 type Config: Send + Sync;
926 type Error: std::error::Error + Send + Sync + 'static;
928
929 async fn load(&self) -> Result<Self::Config>;
931
932 async fn save(&self, config: &Self::Config) -> Result<()>;
934
935 async fn validate(&self, config: &Self::Config) -> Result<()>;
937
938 async fn default_config(&self) -> Self::Config;
940
941 async fn watch_changes(&self) -> Result<Pin<Box<dyn futures::Stream<Item = Result<Self::Config>> + Send + 'static>>>
943 where
944 Self::Config: 'static
945 {
946 let stream = futures::stream::empty::<Result<Self::Config>>();
948 Ok(Box::pin(stream))
949 }
950
951 async fn reload(&self) -> Result<Self::Config> {
953 self.load().await
954 }
955
956 async fn health_check(&self) -> Result<bool> {
958 Ok(true)
959 }
960}
961
962pub trait MetricsCollector {
967 fn counter(&self, name: &str, value: u64, tags: Option<&[(&str, &str)]>);
969
970 fn gauge(&self, name: &str, value: f64, tags: Option<&[(&str, &str)]>);
972
973 fn histogram(&self, name: &str, value: f64, tags: Option<&[(&str, &str)]>);
975
976 fn timer(&self, name: &str) -> Timer;
978}
979
980#[allow(async_fn_in_trait)]
985#[async_trait]
986pub trait AsyncMetricsCollector: Send + Sync {
987 async fn counter(&self, name: &str, value: u64, tags: Option<&[(&str, &str)]>);
989
990 async fn gauge(&self, name: &str, value: f64, tags: Option<&[(&str, &str)]>);
992
993 async fn histogram(&self, name: &str, value: f64, tags: Option<&[(&str, &str)]>);
995
996 async fn record_batch(&self, metrics: &[MetricRecord]) {
998 for metric in metrics {
999 match metric {
1000 MetricRecord::Counter { name, value, tags } => {
1001 let tags_refs: Option<Vec<(&str, &str)>> = tags.as_ref().map(|t| {
1002 t.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect()
1003 });
1004 self.counter(name, *value, tags_refs.as_deref()).await;
1005 }
1006 MetricRecord::Gauge { name, value, tags } => {
1007 let tags_refs: Option<Vec<(&str, &str)>> = tags.as_ref().map(|t| {
1008 t.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect()
1009 });
1010 self.gauge(name, *value, tags_refs.as_deref()).await;
1011 }
1012 MetricRecord::Histogram { name, value, tags } => {
1013 let tags_refs: Option<Vec<(&str, &str)>> = tags.as_ref().map(|t| {
1014 t.iter().map(|(k, v)| (k.as_str(), v.as_str())).collect()
1015 });
1016 self.histogram(name, *value, tags_refs.as_deref()).await;
1017 }
1018 }
1019 }
1020 }
1021
1022 async fn timer(&self, name: &str) -> AsyncTimer;
1024
1025 async fn health_check(&self) -> Result<bool> {
1027 Ok(true)
1028 }
1029
1030 async fn flush(&self) -> Result<()> {
1032 Ok(())
1033 }
1034}
1035
1036#[derive(Debug, Clone)]
1038pub enum MetricRecord {
1039 Counter {
1041 name: String,
1043 value: u64,
1045 tags: Option<Vec<(String, String)>>,
1047 },
1048 Gauge {
1050 name: String,
1052 value: f64,
1054 tags: Option<Vec<(String, String)>>,
1056 },
1057 Histogram {
1059 name: String,
1061 value: f64,
1063 tags: Option<Vec<(String, String)>>,
1065 },
1066}
1067
1068pub struct AsyncTimer {
1070 name: String,
1072 start: std::time::Instant,
1074}
1075
1076impl AsyncTimer {
1077 pub fn new(name: String) -> Self {
1079 Self {
1080 name,
1081 start: std::time::Instant::now(),
1082 }
1083 }
1084
1085 pub async fn finish(self) -> std::time::Duration {
1087 self.start.elapsed()
1088 }
1089
1090 pub fn name(&self) -> &str {
1092 &self.name
1093 }
1094}
1095
1096pub struct Timer {
1098 #[allow(dead_code)]
1100 name: String,
1101 start: std::time::Instant,
1103}
1104
1105impl Timer {
1106 pub fn new(name: String) -> Self {
1108 Self {
1109 name,
1110 start: std::time::Instant::now(),
1111 }
1112 }
1113
1114 pub fn finish(self) -> std::time::Duration {
1116 self.start.elapsed()
1117 }
1118}
1119
1120pub trait Serializer {
1125 type Error: std::error::Error + Send + Sync + 'static;
1127
1128 fn serialize<T: serde::Serialize>(&self, data: &T) -> Result<String>;
1130
1131 fn deserialize<T: serde::de::DeserializeOwned>(&self, data: &str) -> Result<T>;
1133
1134 fn extension(&self) -> &'static str;
1136}
1137
1138#[allow(async_fn_in_trait)]
1143#[async_trait]
1144pub trait AsyncSerializer: Send + Sync {
1145 type Error: std::error::Error + Send + Sync + 'static;
1147
1148 async fn serialize<T: serde::Serialize + Send + Sync>(&self, data: &T) -> Result<String>;
1150
1151 async fn deserialize<T: serde::de::DeserializeOwned + Send + Sync>(&self, data: &str) -> Result<T>;
1153
1154 #[allow(clippy::disallowed_names)]
1156 async fn serialize_bytes<T: serde::Serialize + Send + Sync>(&self, data: &T) -> Result<Vec<u8>> {
1157 let string = self.serialize(data).await?;
1158 Ok(string.into_bytes())
1159 }
1160
1161 #[allow(clippy::disallowed_names)]
1163 async fn deserialize_bytes<T: serde::de::DeserializeOwned + Send + Sync>(&self, data: &[u8]) -> Result<T> {
1164 let string = String::from_utf8(data.to_vec()).map_err(|e| {
1165 crate::core::GraphRAGError::Serialization {
1166 message: format!("Invalid UTF-8 data: {e}"),
1167 }
1168 })?;
1169 self.deserialize(&string).await
1170 }
1171
1172 #[allow(clippy::disallowed_names)]
1174 async fn serialize_batch<T: serde::Serialize + Send + Sync>(&self, data: &[T]) -> Result<Vec<String>> {
1175 let mut results = Vec::with_capacity(data.len());
1176 for item in data {
1177 let serialized = self.serialize(item).await?;
1178 results.push(serialized);
1179 }
1180 Ok(results)
1181 }
1182
1183 fn extension(&self) -> &'static str;
1185
1186 async fn health_check(&self) -> Result<bool> {
1188 Ok(true)
1189 }
1190}
1191
1192pub mod sync_to_async {
1198 use super::*;
1199 use std::sync::Arc;
1200
1201 pub struct StorageAdapter<T>(pub Arc<tokio::sync::Mutex<T>>);
1203
1204 #[async_trait]
1205 impl<T> AsyncStorage for StorageAdapter<T>
1206 where
1207 T: Storage + Send + Sync + 'static,
1208 T::Entity: Send + Sync,
1209 T::Document: Send + Sync,
1210 T::Chunk: Send + Sync,
1211 {
1212 type Entity = T::Entity;
1214 type Document = T::Document;
1216 type Chunk = T::Chunk;
1218 type Error = T::Error;
1220
1221 async fn store_entity(&mut self, entity: Self::Entity) -> Result<String> {
1222 let mut storage = self.0.lock().await;
1223 storage.store_entity(entity)
1224 }
1225
1226 async fn retrieve_entity(&self, id: &str) -> Result<Option<Self::Entity>> {
1227 let storage = self.0.lock().await;
1228 storage.retrieve_entity(id)
1229 }
1230
1231 async fn store_document(&mut self, document: Self::Document) -> Result<String> {
1232 let mut storage = self.0.lock().await;
1233 storage.store_document(document)
1234 }
1235
1236 async fn retrieve_document(&self, id: &str) -> Result<Option<Self::Document>> {
1237 let storage = self.0.lock().await;
1238 storage.retrieve_document(id)
1239 }
1240
1241 async fn store_chunk(&mut self, chunk: Self::Chunk) -> Result<String> {
1242 let mut storage = self.0.lock().await;
1243 storage.store_chunk(chunk)
1244 }
1245
1246 async fn retrieve_chunk(&self, id: &str) -> Result<Option<Self::Chunk>> {
1247 let storage = self.0.lock().await;
1248 storage.retrieve_chunk(id)
1249 }
1250
1251 async fn list_entities(&self) -> Result<Vec<String>> {
1252 let storage = self.0.lock().await;
1253 storage.list_entities()
1254 }
1255
1256 async fn store_entities_batch(&mut self, entities: Vec<Self::Entity>) -> Result<Vec<String>> {
1257 let mut storage = self.0.lock().await;
1258 storage.store_entities_batch(entities)
1259 }
1260 }
1261
1262 pub struct LanguageModelAdapter<T>(pub Arc<T>);
1264
1265 #[async_trait]
1266 impl<T> AsyncLanguageModel for LanguageModelAdapter<T>
1267 where
1268 T: LanguageModel + Send + Sync + 'static,
1269 {
1270 type Error = T::Error;
1272
1273 async fn complete(&self, prompt: &str) -> Result<String> {
1274 self.0.complete(prompt)
1275 }
1276
1277 async fn complete_with_params(&self, prompt: &str, params: GenerationParams) -> Result<String> {
1278 self.0.complete_with_params(prompt, params)
1279 }
1280
1281 async fn is_available(&self) -> bool {
1282 self.0.is_available()
1283 }
1284
1285 async fn model_info(&self) -> ModelInfo {
1286 self.0.model_info()
1287 }
1288 }
1289}
1290
1291pub mod async_utils {
1293 use super::*;
1294 use std::time::Duration;
1295
1296 pub async fn with_timeout<F, T>(
1298 future: F,
1299 timeout: Duration,
1300 ) -> Result<T>
1301 where
1302 F: Future<Output = Result<T>>,
1303 {
1304 match tokio::time::timeout(timeout, future).await {
1305 Ok(result) => result,
1306 Err(_) => Err(crate::core::GraphRAGError::Timeout {
1307 operation: "async operation".to_string(),
1308 duration: timeout,
1309 }),
1310 }
1311 }
1312
1313 pub async fn with_retry<F, T, E>(
1315 mut operation: F,
1316 max_retries: usize,
1317 delay: Duration,
1318 ) -> std::result::Result<T, E>
1319 where
1320 F: FnMut() -> Pin<Box<dyn Future<Output = std::result::Result<T, E>> + Send>>,
1321 E: std::fmt::Debug,
1322 {
1323 let mut attempts = 0;
1324 loop {
1325 match operation().await {
1326 Ok(result) => return Ok(result),
1327 Err(err) => {
1328 attempts += 1;
1329 if attempts >= max_retries {
1330 return Err(err);
1331 }
1332 tokio::time::sleep(delay).await;
1333 }
1334 }
1335 }
1336 }
1337
1338 pub async fn process_batch_with_rate_limit<T, F, R>(
1340 items: Vec<T>,
1341 processor: F,
1342 max_concurrent: usize,
1343 rate_limit: Option<Duration>,
1344 ) -> Vec<Result<R>>
1345 where
1346 T: Send + 'static,
1347 F: Fn(T) -> Pin<Box<dyn Future<Output = Result<R>> + Send>> + Send + Sync + 'static,
1348 R: Send + 'static,
1349 {
1350 use futures::stream::{FuturesUnordered, StreamExt};
1351 use std::sync::Arc;
1352
1353 let processor = Arc::new(processor);
1354 let mut futures = FuturesUnordered::new();
1355 let mut results = Vec::with_capacity(items.len());
1356 let mut pending = 0;
1357
1358 for item in items {
1359 if pending >= max_concurrent {
1360 if let Some(result) = futures.next().await {
1361 results.push(result);
1362 pending -= 1;
1363 }
1364 }
1365
1366 let processor_clone = Arc::clone(&processor);
1367 futures.push(async move {
1368 if let Some(delay) = rate_limit {
1369 tokio::time::sleep(delay).await;
1370 }
1371 processor_clone(item).await
1372 });
1373 pending += 1;
1374 }
1375
1376 while let Some(result) = futures.next().await {
1377 results.push(result);
1378 }
1379
1380 results
1381 }
1382}
1383
1384pub type BoxedAsyncLanguageModel = Box<dyn AsyncLanguageModel<Error = crate::core::GraphRAGError> + Send + Sync>;
1387pub type BoxedAsyncEmbedder = Box<dyn AsyncEmbedder<Error = crate::core::GraphRAGError> + Send + Sync>;
1389pub type BoxedAsyncVectorStore = Box<dyn AsyncVectorStore<Error = crate::core::GraphRAGError> + Send + Sync>;
1391pub type BoxedAsyncRetriever = Box<dyn AsyncRetriever<Query = String, Result = crate::retrieval::SearchResult, Error = crate::core::GraphRAGError> + Send + Sync>;