1use async_trait::async_trait;
4#[cfg(feature = "vector-qdrant")]
5use qdrant_client::config::QdrantConfig as ClientConfig;
6#[cfg(feature = "vector-qdrant")]
7use qdrant_client::qdrant::{
8 Condition, CreateCollection, DeletePoints, Distance, FieldCondition, Filter, Match, PointId,
9 PointStruct, PointsIdsList, PointsSelector, SearchPoints, UpsertPoints, Value as QdrantValue,
10 VectorParams, VectorsConfig, WithPayloadSelector, WithVectorsSelector,
11};
12#[cfg(feature = "vector-qdrant")]
13use qdrant_client::Qdrant;
14use serde_json::Value;
15use std::collections::HashMap;
16use std::sync::Arc;
17use tokio::sync::RwLock;
18
19use super::types::*;
20use crate::types::AgentId;
21
22#[cfg(feature = "vector-qdrant")]
24fn map_qdrant_error(error: qdrant_client::QdrantError) -> ContextError {
25 match error {
26 qdrant_client::QdrantError::ResponseError { status, .. } => {
27 let status_code = status.code() as u16;
28 match status_code {
29 404 => ContextError::StorageError {
30 reason: "Collection or point not found in Qdrant".to_string(),
31 },
32 401 | 403 => ContextError::AccessDenied {
33 reason: "Authentication failed for Qdrant database".to_string(),
34 },
35 400 => ContextError::InvalidOperation {
36 reason: "Invalid request to Qdrant database".to_string(),
37 },
38 500..=599 => ContextError::StorageError {
39 reason: format!("Qdrant server error: {}", status),
40 },
41 _ => ContextError::StorageError {
42 reason: format!("Qdrant API error: {}", status),
43 },
44 }
45 }
46 qdrant_client::QdrantError::ConversionError { .. } => ContextError::InvalidOperation {
47 reason: "Data conversion error with Qdrant".to_string(),
48 },
49 _ => ContextError::StorageError {
50 reason: format!("Qdrant database error: {}", error),
51 },
52 }
53}
54
55#[derive(Debug, Clone)]
57pub struct QdrantConfig {
58 pub url: String,
60 pub api_key: Option<String>,
62 pub collection_name: String,
64 pub vector_dimension: usize,
66 pub distance_metric: QdrantDistance,
68 pub batch_size: usize,
70 pub timeout_seconds: u64,
72}
73
74impl Default for QdrantConfig {
75 fn default() -> Self {
76 Self {
77 url: "http://localhost:6333".to_string(),
78 api_key: None,
79 collection_name: "symbiont_context".to_string(),
80 vector_dimension: 384, distance_metric: QdrantDistance::Cosine,
82 batch_size: 100,
83 timeout_seconds: 30,
84 }
85 }
86}
87
88#[derive(Debug, Clone)]
90pub enum QdrantDistance {
91 Cosine,
92 Euclidean,
93 Dot,
94}
95
96#[cfg(feature = "vector-qdrant")]
97impl From<QdrantDistance> for Distance {
98 fn from(distance: QdrantDistance) -> Self {
99 match distance {
100 QdrantDistance::Cosine => Distance::Cosine,
101 QdrantDistance::Euclidean => Distance::Euclid,
102 QdrantDistance::Dot => Distance::Dot,
103 }
104 }
105}
106
107#[async_trait]
109pub trait VectorDatabase: Send + Sync {
110 async fn initialize(&self) -> Result<(), ContextError>;
112
113 async fn store_knowledge_item(
115 &self,
116 item: &KnowledgeItem,
117 embedding: Vec<f32>,
118 ) -> Result<VectorId, ContextError>;
119
120 async fn store_memory_item(
122 &self,
123 agent_id: AgentId,
124 memory: &MemoryItem,
125 embedding: Vec<f32>,
126 ) -> Result<VectorId, ContextError>;
127
128 async fn batch_store(&self, batch: VectorBatchOperation)
130 -> Result<Vec<VectorId>, ContextError>;
131
132 async fn search_knowledge_base(
134 &self,
135 agent_id: AgentId,
136 query_embedding: Vec<f32>,
137 limit: usize,
138 ) -> Result<Vec<KnowledgeItem>, ContextError>;
139
140 async fn semantic_search(
142 &self,
143 agent_id: AgentId,
144 query_embedding: Vec<f32>,
145 limit: usize,
146 threshold: f32,
147 ) -> Result<Vec<ContextItem>, ContextError>;
148
149 async fn advanced_search(
151 &self,
152 agent_id: AgentId,
153 query_embedding: Vec<f32>,
154 filters: HashMap<String, String>,
155 limit: usize,
156 threshold: f32,
157 ) -> Result<Vec<VectorSearchResult>, ContextError>;
158
159 async fn delete_knowledge_item(&self, vector_id: VectorId) -> Result<(), ContextError>;
161
162 async fn batch_delete(&self, vector_ids: Vec<VectorId>) -> Result<(), ContextError>;
164
165 async fn update_metadata(
167 &self,
168 vector_id: VectorId,
169 metadata: HashMap<String, Value>,
170 ) -> Result<(), ContextError>;
171
172 async fn get_stats(&self) -> Result<VectorDatabaseStats, ContextError>;
174
175 async fn create_index(&self, field_name: &str) -> Result<(), ContextError>;
177
178 async fn optimize_collection(&self) -> Result<(), ContextError>;
180}
181
182#[derive(Debug, Clone)]
184pub struct VectorDatabaseStats {
185 pub total_vectors: usize,
186 pub collection_size_bytes: usize,
187 pub avg_query_time_ms: f32,
188}
189
190#[cfg(feature = "vector-qdrant")]
192pub struct QdrantClientWrapper {
193 client: Arc<RwLock<Option<Arc<Qdrant>>>>,
194 config: QdrantConfig,
195}
196
197#[cfg(feature = "vector-qdrant")]
198impl QdrantClientWrapper {
199 pub fn new(config: QdrantConfig) -> Self {
201 Self {
202 client: Arc::new(RwLock::new(None)),
203 config,
204 }
205 }
206
207 async fn get_client(&self) -> Result<Arc<Qdrant>, ContextError> {
209 let client_guard = self.client.read().await;
210 if let Some(client) = client_guard.as_ref() {
211 Ok(Arc::clone(client))
212 } else {
213 drop(client_guard);
214
215 let mut client_config = ClientConfig::from_url(&self.config.url);
217
218 if let Some(api_key) = &self.config.api_key {
219 client_config.api_key = Some(api_key.clone());
220 }
221
222 let client = Qdrant::new(client_config).map_err(map_qdrant_error)?;
223
224 let client_arc = Arc::new(client);
225 let mut client_guard = self.client.write().await;
226 *client_guard = Some(Arc::clone(&client_arc));
227
228 Ok(client_arc)
229 }
230 }
231
232 fn knowledge_item_to_metadata(
234 &self,
235 item: &KnowledgeItem,
236 agent_id: AgentId,
237 ) -> HashMap<String, QdrantValue> {
238 let mut metadata = HashMap::new();
239
240 metadata.insert(
241 "agent_id".to_string(),
242 QdrantValue::from(agent_id.to_string()),
243 );
244 metadata.insert(
245 "knowledge_id".to_string(),
246 QdrantValue::from(item.id.to_string()),
247 );
248 metadata.insert(
249 "content".to_string(),
250 QdrantValue::from(item.content.clone()),
251 );
252 metadata.insert(
253 "knowledge_type".to_string(),
254 QdrantValue::from(format!("{:?}", item.knowledge_type)),
255 );
256 metadata.insert(
257 "confidence".to_string(),
258 QdrantValue::from(item.confidence as f64),
259 );
260 metadata.insert(
261 "relevance_score".to_string(),
262 QdrantValue::from(item.relevance_score as f64),
263 );
264 metadata.insert(
265 "source".to_string(),
266 QdrantValue::from(format!("{:?}", item.source)),
267 );
268 metadata.insert(
269 "created_at".to_string(),
270 QdrantValue::from(
271 item.created_at
272 .duration_since(std::time::UNIX_EPOCH)
273 .unwrap_or_default()
274 .as_secs() as i64,
275 ),
276 );
277
278 metadata
279 }
280
281 fn point_to_knowledge_item(
283 &self,
284 point: &qdrant_client::qdrant::ScoredPoint,
285 ) -> Result<KnowledgeItem, ContextError> {
286 let payload = &point.payload;
287
288 let knowledge_id_str = payload
289 .get("knowledge_id")
290 .and_then(|v| self.extract_string_value(v))
291 .ok_or_else(|| ContextError::StorageError {
292 reason: "Missing knowledge_id in payload".to_string(),
293 })?;
294
295 let knowledge_id = KnowledgeId(uuid::Uuid::parse_str(&knowledge_id_str).map_err(|e| {
296 ContextError::StorageError {
297 reason: format!("Invalid knowledge_id UUID: {}", e),
298 }
299 })?);
300
301 let content = payload
302 .get("content")
303 .and_then(|v| self.extract_string_value(v))
304 .unwrap_or_default();
305
306 let knowledge_type_str = payload
307 .get("knowledge_type")
308 .and_then(|v| self.extract_string_value(v))
309 .unwrap_or_else(|| "Fact".to_string());
310
311 let knowledge_type = match knowledge_type_str.as_str() {
312 "Fact" => KnowledgeType::Fact,
313 "Procedure" => KnowledgeType::Procedure,
314 "Pattern" => KnowledgeType::Pattern,
315 "Shared" => KnowledgeType::Shared,
316 _ => KnowledgeType::Fact,
317 };
318
319 let confidence = payload
320 .get("confidence")
321 .and_then(|v| self.extract_f64_value(v))
322 .unwrap_or(0.0) as f32;
323
324 let relevance_score = point.score;
325
326 let source = KnowledgeSource::Learning; let created_at = payload
329 .get("created_at")
330 .and_then(|v| self.extract_i64_value(v))
331 .map(|timestamp| {
332 std::time::UNIX_EPOCH + std::time::Duration::from_secs(timestamp as u64)
333 })
334 .unwrap_or_else(std::time::SystemTime::now);
335
336 Ok(KnowledgeItem {
337 id: knowledge_id,
338 content,
339 knowledge_type,
340 confidence,
341 relevance_score,
342 source,
343 created_at,
344 })
345 }
346
347 fn extract_string_value(&self, value: &QdrantValue) -> Option<String> {
349 match value {
350 QdrantValue {
351 kind: Some(qdrant_client::qdrant::value::Kind::StringValue(s)),
352 } => Some(s.clone()),
353 _ => None,
354 }
355 }
356
357 fn extract_f64_value(&self, value: &QdrantValue) -> Option<f64> {
359 match value {
360 QdrantValue {
361 kind: Some(qdrant_client::qdrant::value::Kind::DoubleValue(d)),
362 } => Some(*d),
363 QdrantValue {
364 kind: Some(qdrant_client::qdrant::value::Kind::IntegerValue(i)),
365 } => Some(*i as f64),
366 _ => None,
367 }
368 }
369
370 fn extract_i64_value(&self, value: &QdrantValue) -> Option<i64> {
372 match value {
373 QdrantValue {
374 kind: Some(qdrant_client::qdrant::value::Kind::IntegerValue(i)),
375 } => Some(*i),
376 QdrantValue {
377 kind: Some(qdrant_client::qdrant::value::Kind::DoubleValue(d)),
378 } => Some(*d as i64),
379 _ => None,
380 }
381 }
382}
383
384#[cfg(feature = "vector-qdrant")]
385#[async_trait]
386impl VectorDatabase for QdrantClientWrapper {
387 async fn initialize(&self) -> Result<(), ContextError> {
388 let client = self.get_client().await?;
389
390 let collections = client.list_collections().await.map_err(map_qdrant_error)?;
392
393 let collection_exists = collections
394 .collections
395 .iter()
396 .any(|c| c.name == self.config.collection_name);
397
398 if !collection_exists {
399 let vectors_config = VectorsConfig {
401 config: Some(qdrant_client::qdrant::vectors_config::Config::Params(
402 VectorParams {
403 size: self.config.vector_dimension as u64,
404 distance: Distance::from(self.config.distance_metric.clone()) as i32,
405 hnsw_config: None,
406 quantization_config: None,
407 on_disk: None,
408 datatype: None,
409 multivector_config: None,
410 },
411 )),
412 };
413
414 let create_collection = CreateCollection {
415 collection_name: self.config.collection_name.clone(),
416 vectors_config: Some(vectors_config),
417 hnsw_config: None,
418 wal_config: None,
419 optimizers_config: None,
420 shard_number: None,
421 on_disk_payload: None,
422 timeout: Some(self.config.timeout_seconds),
423 replication_factor: None,
424 write_consistency_factor: None,
425 init_from_collection: None,
426 quantization_config: None,
427 sharding_method: None,
428 sparse_vectors_config: None,
429 strict_mode_config: None,
430 };
431
432 client
433 .create_collection(create_collection)
434 .await
435 .map_err(map_qdrant_error)?;
436 }
437
438 Ok(())
439 }
440
441 async fn store_knowledge_item(
442 &self,
443 item: &KnowledgeItem,
444 embedding: Vec<f32>,
445 ) -> Result<VectorId, ContextError> {
446 let client = self.get_client().await?;
447 let vector_id = VectorId::new();
448
449 let agent_id = {
451 use std::collections::hash_map::DefaultHasher;
452 use std::hash::{Hash, Hasher};
453
454 let mut hasher = DefaultHasher::new();
455 item.content.hash(&mut hasher);
456 item.id.0.hash(&mut hasher);
457 let hash_value = hasher.finish();
458
459 let uuid_bytes = [
461 (hash_value >> 56) as u8,
462 (hash_value >> 48) as u8,
463 (hash_value >> 40) as u8,
464 (hash_value >> 32) as u8,
465 (hash_value >> 24) as u8,
466 (hash_value >> 16) as u8,
467 (hash_value >> 8) as u8,
468 hash_value as u8,
469 (hash_value >> 56) as u8,
471 (hash_value >> 48) as u8,
472 (hash_value >> 40) as u8,
473 (hash_value >> 32) as u8,
474 (hash_value >> 24) as u8,
475 (hash_value >> 16) as u8,
476 (hash_value >> 8) as u8,
477 hash_value as u8,
478 ];
479
480 AgentId(uuid::Uuid::from_bytes(uuid_bytes))
481 };
482
483 let point = PointStruct::new(
485 vector_id.0.to_string(),
486 embedding,
487 self.knowledge_item_to_metadata(item, agent_id),
488 );
489
490 let upsert_points = UpsertPoints {
491 collection_name: self.config.collection_name.clone(),
492 wait: Some(true),
493 points: vec![point],
494 ordering: None,
495 shard_key_selector: None,
496 };
497
498 client
499 .upsert_points(upsert_points)
500 .await
501 .map_err(map_qdrant_error)?;
502
503 Ok(vector_id)
504 }
505
506 async fn store_memory_item(
507 &self,
508 agent_id: AgentId,
509 memory: &MemoryItem,
510 embedding: Vec<f32>,
511 ) -> Result<VectorId, ContextError> {
512 let client = self.get_client().await?;
513 let vector_id = VectorId::new();
514
515 let mut metadata = HashMap::new();
517 metadata.insert(
518 "agent_id".to_string(),
519 QdrantValue::from(agent_id.to_string()),
520 );
521 metadata.insert(
522 "memory_id".to_string(),
523 QdrantValue::from(memory.id.to_string()),
524 );
525 metadata.insert(
526 "content".to_string(),
527 QdrantValue::from(memory.content.clone()),
528 );
529 metadata.insert(
530 "memory_type".to_string(),
531 QdrantValue::from(format!("{:?}", memory.memory_type)),
532 );
533 metadata.insert(
534 "importance".to_string(),
535 QdrantValue::from(memory.importance as f64),
536 );
537 metadata.insert(
538 "access_count".to_string(),
539 QdrantValue::from(memory.access_count as i64),
540 );
541 metadata.insert(
542 "created_at".to_string(),
543 QdrantValue::from(
544 memory
545 .created_at
546 .duration_since(std::time::UNIX_EPOCH)
547 .unwrap_or_default()
548 .as_secs() as i64,
549 ),
550 );
551 metadata.insert(
552 "last_accessed".to_string(),
553 QdrantValue::from(
554 memory
555 .last_accessed
556 .duration_since(std::time::UNIX_EPOCH)
557 .unwrap_or_default()
558 .as_secs() as i64,
559 ),
560 );
561
562 for (key, value) in &memory.metadata {
564 metadata.insert(format!("meta_{}", key), QdrantValue::from(value.clone()));
565 }
566
567 let point = PointStruct::new(vector_id.0.to_string(), embedding, metadata);
568
569 let upsert_points = UpsertPoints {
570 collection_name: self.config.collection_name.clone(),
571 wait: Some(true),
572 points: vec![point],
573 ordering: None,
574 shard_key_selector: None,
575 };
576
577 client
578 .upsert_points(upsert_points)
579 .await
580 .map_err(map_qdrant_error)?;
581
582 Ok(vector_id)
583 }
584
585 async fn batch_store(
586 &self,
587 batch: VectorBatchOperation,
588 ) -> Result<Vec<VectorId>, ContextError> {
589 let client = self.get_client().await?;
590 let mut points = Vec::new();
591 let mut vector_ids = Vec::new();
592
593 for item in &batch.items {
594 let vector_id = item.id.unwrap_or_else(VectorId::new);
595 vector_ids.push(vector_id);
596
597 let mut metadata = HashMap::new();
599 metadata.insert(
600 "agent_id".to_string(),
601 QdrantValue::from(item.metadata.agent_id.to_string()),
602 );
603 metadata.insert(
604 "content_type".to_string(),
605 QdrantValue::from(format!("{:?}", item.metadata.content_type)),
606 );
607 metadata.insert(
608 "source_id".to_string(),
609 QdrantValue::from(item.metadata.source_id.clone()),
610 );
611 metadata.insert(
612 "created_at".to_string(),
613 QdrantValue::from(
614 item.metadata
615 .created_at
616 .duration_since(std::time::UNIX_EPOCH)
617 .unwrap_or_default()
618 .as_secs() as i64,
619 ),
620 );
621 metadata.insert(
622 "updated_at".to_string(),
623 QdrantValue::from(
624 item.metadata
625 .updated_at
626 .duration_since(std::time::UNIX_EPOCH)
627 .unwrap_or_default()
628 .as_secs() as i64,
629 ),
630 );
631
632 for (i, tag) in item.metadata.tags.iter().enumerate() {
634 metadata.insert(format!("tag_{}", i), QdrantValue::from(tag.clone()));
635 }
636
637 for (key, value) in &item.metadata.custom_fields {
639 metadata.insert(format!("custom_{}", key), QdrantValue::from(value.clone()));
640 }
641
642 let embedding = item
643 .embedding
644 .clone()
645 .unwrap_or_else(|| vec![0.0; self.config.vector_dimension]);
646
647 let point = PointStruct::new(vector_id.0.to_string(), embedding, metadata);
648
649 points.push(point);
650 }
651
652 let batch_size = self.config.batch_size;
654 for chunk in points.chunks(batch_size) {
655 let upsert_points = UpsertPoints {
656 collection_name: self.config.collection_name.clone(),
657 wait: Some(true),
658 points: chunk.to_vec(),
659 ordering: None,
660 shard_key_selector: None,
661 };
662
663 client
664 .upsert_points(upsert_points)
665 .await
666 .map_err(map_qdrant_error)?;
667 }
668
669 Ok(vector_ids)
670 }
671
672 async fn search_knowledge_base(
673 &self,
674 agent_id: AgentId,
675 query_embedding: Vec<f32>,
676 limit: usize,
677 ) -> Result<Vec<KnowledgeItem>, ContextError> {
678 let client = self.get_client().await?;
679
680 let filter = Filter {
682 should: vec![],
683 min_should: None,
684 must: vec![Condition {
685 condition_one_of: Some(qdrant_client::qdrant::condition::ConditionOneOf::Field(
686 FieldCondition {
687 key: "agent_id".to_string(),
688 r#match: Some(Match {
689 match_value: Some(qdrant_client::qdrant::r#match::MatchValue::Keyword(
690 agent_id.to_string(),
691 )),
692 }),
693 range: None,
694 geo_bounding_box: None,
695 geo_radius: None,
696 values_count: None,
697 geo_polygon: None,
698 datetime_range: None,
699 is_empty: None,
700 is_null: None,
701 },
702 )),
703 }],
704 must_not: vec![],
705 };
706
707 let search_points = SearchPoints {
708 collection_name: self.config.collection_name.clone(),
709 vector: query_embedding,
710 vector_name: None,
711 filter: Some(filter),
712 limit: limit as u64,
713 with_payload: Some(WithPayloadSelector {
714 selector_options: Some(
715 qdrant_client::qdrant::with_payload_selector::SelectorOptions::Enable(true),
716 ),
717 }),
718 params: None,
719 score_threshold: None,
720 offset: None,
721 with_vectors: None,
722 read_consistency: None,
723 shard_key_selector: None,
724 sparse_indices: None,
725 timeout: None,
726 };
727
728 let search_result = client
729 .search_points(search_points)
730 .await
731 .map_err(map_qdrant_error)?;
732
733 let mut knowledge_items = Vec::new();
734 for point in search_result.result {
735 match self.point_to_knowledge_item(&point) {
736 Ok(item) => knowledge_items.push(item),
737 Err(e) => {
738 eprintln!("Failed to convert point to knowledge item: {}", e);
740 }
741 }
742 }
743
744 Ok(knowledge_items)
745 }
746
747 async fn semantic_search(
748 &self,
749 agent_id: AgentId,
750 query_embedding: Vec<f32>,
751 limit: usize,
752 threshold: f32,
753 ) -> Result<Vec<ContextItem>, ContextError> {
754 let client = self.get_client().await?;
755
756 let filter = Filter {
758 should: vec![],
759 min_should: None,
760 must: vec![Condition {
761 condition_one_of: Some(qdrant_client::qdrant::condition::ConditionOneOf::Field(
762 FieldCondition {
763 key: "agent_id".to_string(),
764 r#match: Some(Match {
765 match_value: Some(qdrant_client::qdrant::r#match::MatchValue::Keyword(
766 agent_id.to_string(),
767 )),
768 }),
769 range: None,
770 geo_bounding_box: None,
771 geo_radius: None,
772 values_count: None,
773 geo_polygon: None,
774 datetime_range: None,
775 is_empty: None,
776 is_null: None,
777 },
778 )),
779 }],
780 must_not: vec![],
781 };
782
783 let search_points = SearchPoints {
784 collection_name: self.config.collection_name.clone(),
785 vector: query_embedding,
786 vector_name: None,
787 filter: Some(filter),
788 limit: limit as u64,
789 with_payload: Some(WithPayloadSelector {
790 selector_options: Some(
791 qdrant_client::qdrant::with_payload_selector::SelectorOptions::Enable(true),
792 ),
793 }),
794 params: None,
795 score_threshold: Some(threshold),
796 offset: None,
797 with_vectors: None,
798 read_consistency: None,
799 shard_key_selector: None,
800 sparse_indices: None,
801 timeout: None,
802 };
803
804 let search_result = client
805 .search_points(search_points)
806 .await
807 .map_err(map_qdrant_error)?;
808
809 let mut context_items = Vec::new();
810 for point in search_result.result {
811 let payload = &point.payload;
812 let context_id_str = payload
813 .get("context_id")
814 .and_then(|v| self.extract_string_value(v))
815 .unwrap_or_default();
816
817 let context_id = ContextId(
818 uuid::Uuid::parse_str(&context_id_str).unwrap_or_else(|_| uuid::Uuid::new_v4()),
819 );
820
821 let content = payload
822 .get("content")
823 .and_then(|v| self.extract_string_value(v))
824 .unwrap_or_default();
825
826 let item_type = ContextItemType::Memory(MemoryType::Semantic); let timestamp = payload
829 .get("timestamp")
830 .and_then(|v| self.extract_i64_value(v))
831 .map(|ts| std::time::UNIX_EPOCH + std::time::Duration::from_secs(ts as u64))
832 .unwrap_or_else(std::time::SystemTime::now);
833
834 let mut metadata = HashMap::new();
836 for (key, value) in payload {
837 if key.starts_with("meta_") {
838 let meta_key = key.strip_prefix("meta_").unwrap_or(key);
839 if let Some(str_value) = self.extract_string_value(value) {
840 metadata.insert(meta_key.to_string(), str_value);
841 }
842 }
843 }
844
845 context_items.push(ContextItem {
846 id: context_id,
847 content,
848 item_type,
849 relevance_score: point.score,
850 timestamp,
851 metadata,
852 });
853 }
854
855 Ok(context_items)
856 }
857
858 async fn delete_knowledge_item(&self, vector_id: VectorId) -> Result<(), ContextError> {
859 let client = self.get_client().await?;
860
861 let delete_points = DeletePoints {
862 collection_name: self.config.collection_name.clone(),
863 wait: Some(true),
864 points: Some(PointsSelector {
865 points_selector_one_of: Some(
866 qdrant_client::qdrant::points_selector::PointsSelectorOneOf::Points(
867 PointsIdsList {
868 ids: vec![PointId::from(vector_id.0.to_string())],
869 },
870 ),
871 ),
872 }),
873 ordering: None,
874 shard_key_selector: None,
875 };
876
877 client
878 .delete_points(delete_points)
879 .await
880 .map_err(map_qdrant_error)?;
881
882 Ok(())
883 }
884
885 async fn advanced_search(
886 &self,
887 agent_id: AgentId,
888 query_embedding: Vec<f32>,
889 filters: HashMap<String, String>,
890 limit: usize,
891 threshold: f32,
892 ) -> Result<Vec<VectorSearchResult>, ContextError> {
893 let client = self.get_client().await?;
894
895 let mut conditions = vec![Condition {
897 condition_one_of: Some(qdrant_client::qdrant::condition::ConditionOneOf::Field(
898 FieldCondition {
899 key: "agent_id".to_string(),
900 r#match: Some(Match {
901 match_value: Some(qdrant_client::qdrant::r#match::MatchValue::Keyword(
902 agent_id.to_string(),
903 )),
904 }),
905 range: None,
906 geo_bounding_box: None,
907 geo_radius: None,
908 values_count: None,
909 geo_polygon: None,
910 datetime_range: None,
911 is_empty: None,
912 is_null: None,
913 },
914 )),
915 }];
916
917 for (key, value) in filters {
919 conditions.push(Condition {
920 condition_one_of: Some(qdrant_client::qdrant::condition::ConditionOneOf::Field(
921 FieldCondition {
922 key,
923 r#match: Some(Match {
924 match_value: Some(qdrant_client::qdrant::r#match::MatchValue::Keyword(
925 value,
926 )),
927 }),
928 range: None,
929 geo_bounding_box: None,
930 geo_radius: None,
931 values_count: None,
932 geo_polygon: None,
933 datetime_range: None,
934 is_empty: None,
935 is_null: None,
936 },
937 )),
938 });
939 }
940
941 let filter = Filter {
942 should: vec![],
943 min_should: None,
944 must: conditions,
945 must_not: vec![],
946 };
947
948 let search_points = SearchPoints {
949 collection_name: self.config.collection_name.clone(),
950 vector: query_embedding,
951 vector_name: None,
952 filter: Some(filter),
953 limit: limit as u64,
954 with_payload: Some(WithPayloadSelector {
955 selector_options: Some(
956 qdrant_client::qdrant::with_payload_selector::SelectorOptions::Enable(true),
957 ),
958 }),
959 params: None,
960 score_threshold: Some(threshold),
961 offset: None,
962 with_vectors: Some(WithVectorsSelector {
963 selector_options: Some(
964 qdrant_client::qdrant::with_vectors_selector::SelectorOptions::Enable(true),
965 ),
966 }),
967 read_consistency: None,
968 shard_key_selector: None,
969 sparse_indices: None,
970 timeout: None,
971 };
972
973 let search_result = client
974 .search_points(search_points)
975 .await
976 .map_err(map_qdrant_error)?;
977
978 let mut results = Vec::new();
979 for point in search_result.result {
980 let payload = &point.payload;
981 let vector_id_str = point
982 .id
983 .map(|id| match id.point_id_options {
984 Some(qdrant_client::qdrant::point_id::PointIdOptions::Uuid(uuid)) => uuid,
985 Some(qdrant_client::qdrant::point_id::PointIdOptions::Num(num)) => {
986 num.to_string()
987 }
988 None => uuid::Uuid::new_v4().to_string(),
989 })
990 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
991
992 let vector_id = VectorId(
993 uuid::Uuid::parse_str(&vector_id_str).unwrap_or_else(|_| uuid::Uuid::new_v4()),
994 );
995
996 let content = payload
997 .get("content")
998 .and_then(|v| self.extract_string_value(v))
999 .unwrap_or_default();
1000
1001 let mut metadata = HashMap::new();
1003 for (key, value) in payload {
1004 if let Some(str_value) = self.extract_string_value(value) {
1005 metadata.insert(key.clone(), str_value);
1006 }
1007 }
1008
1009 let embedding = point
1011 .vectors
1012 .and_then(|vectors| match vectors.vectors_options {
1013 Some(qdrant_client::qdrant::vectors_output::VectorsOptions::Vector(vector)) => {
1014 Some(vector.data)
1015 }
1016 _ => None,
1017 });
1018
1019 results.push(VectorSearchResult {
1020 id: vector_id,
1021 content,
1022 score: point.score,
1023 metadata,
1024 embedding,
1025 });
1026 }
1027
1028 Ok(results)
1029 }
1030
1031 async fn batch_delete(&self, vector_ids: Vec<VectorId>) -> Result<(), ContextError> {
1032 let client = self.get_client().await?;
1033
1034 let batch_size = self.config.batch_size;
1036 for chunk in vector_ids.chunks(batch_size) {
1037 let ids: Vec<PointId> = chunk
1038 .iter()
1039 .map(|id| PointId::from(id.0.to_string()))
1040 .collect();
1041
1042 let delete_points = DeletePoints {
1043 collection_name: self.config.collection_name.clone(),
1044 wait: Some(true),
1045 points: Some(PointsSelector {
1046 points_selector_one_of: Some(
1047 qdrant_client::qdrant::points_selector::PointsSelectorOneOf::Points(
1048 PointsIdsList { ids },
1049 ),
1050 ),
1051 }),
1052 ordering: None,
1053 shard_key_selector: None,
1054 };
1055
1056 client
1057 .delete_points(delete_points)
1058 .await
1059 .map_err(map_qdrant_error)?;
1060 }
1061
1062 Ok(())
1063 }
1064
1065 async fn update_metadata(
1066 &self,
1067 _vector_id: VectorId,
1068 _metadata: HashMap<String, Value>,
1069 ) -> Result<(), ContextError> {
1070 Err(ContextError::InvalidOperation {
1073 reason: "Direct metadata updates not supported, use store_knowledge_item to update"
1074 .to_string(),
1075 })
1076 }
1077
1078 async fn create_index(&self, field_name: &str) -> Result<(), ContextError> {
1079 let client = self.get_client().await?;
1080
1081 let create_index = qdrant_client::qdrant::CreateFieldIndexCollection {
1083 collection_name: self.config.collection_name.clone(),
1084 wait: Some(true),
1085 field_name: field_name.to_string(),
1086 field_type: Some(qdrant_client::qdrant::FieldType::Keyword as i32),
1087 field_index_params: None,
1088 ordering: None,
1089 };
1090
1091 client
1092 .create_field_index(create_index)
1093 .await
1094 .map_err(map_qdrant_error)?;
1095
1096 Ok(())
1097 }
1098
1099 async fn optimize_collection(&self) -> Result<(), ContextError> {
1100 let _client = self.get_client().await?;
1101
1102 Ok(())
1113 }
1114
1115 async fn get_stats(&self) -> Result<VectorDatabaseStats, ContextError> {
1116 let client = self.get_client().await?;
1117
1118 let collection_info = client
1119 .collection_info(&self.config.collection_name)
1120 .await
1121 .map_err(map_qdrant_error)?;
1122
1123 Ok(VectorDatabaseStats {
1124 total_vectors: collection_info
1125 .result
1126 .map(|r| r.points_count.unwrap_or(0) as usize)
1127 .unwrap_or(0),
1128 collection_size_bytes: 0, avg_query_time_ms: 0.0, })
1131 }
1132}
1133
1134#[cfg(feature = "vector-qdrant")]
1135#[async_trait]
1136impl super::vector_db_trait::VectorDb for QdrantClientWrapper {
1137 async fn initialize(&self) -> Result<(), ContextError> {
1138 <Self as VectorDatabase>::initialize(self).await
1139 }
1140 async fn store_knowledge_item(
1141 &self,
1142 item: &KnowledgeItem,
1143 embedding: Vec<f32>,
1144 ) -> Result<VectorId, ContextError> {
1145 <Self as VectorDatabase>::store_knowledge_item(self, item, embedding).await
1146 }
1147 async fn store_memory_item(
1148 &self,
1149 agent_id: AgentId,
1150 memory: &MemoryItem,
1151 embedding: Vec<f32>,
1152 ) -> Result<VectorId, ContextError> {
1153 <Self as VectorDatabase>::store_memory_item(self, agent_id, memory, embedding).await
1154 }
1155 async fn batch_store(
1156 &self,
1157 batch: VectorBatchOperation,
1158 ) -> Result<Vec<VectorId>, ContextError> {
1159 <Self as VectorDatabase>::batch_store(self, batch).await
1160 }
1161 async fn search_knowledge_base(
1162 &self,
1163 agent_id: AgentId,
1164 query_embedding: Vec<f32>,
1165 limit: usize,
1166 ) -> Result<Vec<KnowledgeItem>, ContextError> {
1167 <Self as VectorDatabase>::search_knowledge_base(self, agent_id, query_embedding, limit)
1168 .await
1169 }
1170 async fn semantic_search(
1171 &self,
1172 agent_id: AgentId,
1173 query_embedding: Vec<f32>,
1174 limit: usize,
1175 threshold: f32,
1176 ) -> Result<Vec<ContextItem>, ContextError> {
1177 <Self as VectorDatabase>::semantic_search(self, agent_id, query_embedding, limit, threshold)
1178 .await
1179 }
1180 async fn advanced_search(
1181 &self,
1182 agent_id: AgentId,
1183 query_embedding: Vec<f32>,
1184 filters: HashMap<String, String>,
1185 limit: usize,
1186 threshold: f32,
1187 ) -> Result<Vec<VectorSearchResult>, ContextError> {
1188 <Self as VectorDatabase>::advanced_search(
1189 self,
1190 agent_id,
1191 query_embedding,
1192 filters,
1193 limit,
1194 threshold,
1195 )
1196 .await
1197 }
1198 async fn delete_knowledge_item(&self, vector_id: VectorId) -> Result<(), ContextError> {
1199 <Self as VectorDatabase>::delete_knowledge_item(self, vector_id).await
1200 }
1201 async fn batch_delete(&self, vector_ids: Vec<VectorId>) -> Result<(), ContextError> {
1202 <Self as VectorDatabase>::batch_delete(self, vector_ids).await
1203 }
1204 async fn update_metadata(
1205 &self,
1206 vector_id: VectorId,
1207 metadata: HashMap<String, Value>,
1208 ) -> Result<(), ContextError> {
1209 <Self as VectorDatabase>::update_metadata(self, vector_id, metadata).await
1210 }
1211 async fn get_stats(&self) -> Result<VectorDatabaseStats, ContextError> {
1212 <Self as VectorDatabase>::get_stats(self).await
1213 }
1214 async fn create_index(&self, field_name: &str) -> Result<(), ContextError> {
1215 <Self as VectorDatabase>::create_index(self, field_name).await
1216 }
1217 async fn optimize_collection(&self) -> Result<(), ContextError> {
1218 <Self as VectorDatabase>::optimize_collection(self).await
1219 }
1220 async fn health_check(&self) -> Result<bool, ContextError> {
1221 <Self as VectorDatabase>::get_stats(self)
1222 .await
1223 .map(|_| true)
1224 }
1225}
1226
1227pub struct NoOpVectorDatabase;
1229
1230#[async_trait]
1231impl VectorDatabase for NoOpVectorDatabase {
1232 async fn initialize(&self) -> Result<(), ContextError> {
1233 Ok(())
1234 }
1235
1236 async fn store_knowledge_item(
1237 &self,
1238 _item: &KnowledgeItem,
1239 _embedding: Vec<f32>,
1240 ) -> Result<VectorId, ContextError> {
1241 Ok(VectorId::new())
1242 }
1243
1244 async fn store_memory_item(
1245 &self,
1246 _agent_id: AgentId,
1247 _memory: &MemoryItem,
1248 _embedding: Vec<f32>,
1249 ) -> Result<VectorId, ContextError> {
1250 Ok(VectorId::new())
1251 }
1252
1253 async fn batch_store(
1254 &self,
1255 batch: VectorBatchOperation,
1256 ) -> Result<Vec<VectorId>, ContextError> {
1257 Ok(batch.items.iter().map(|_| VectorId::new()).collect())
1258 }
1259
1260 async fn search_knowledge_base(
1261 &self,
1262 _agent_id: AgentId,
1263 _query_embedding: Vec<f32>,
1264 _limit: usize,
1265 ) -> Result<Vec<KnowledgeItem>, ContextError> {
1266 Ok(Vec::new())
1267 }
1268
1269 async fn semantic_search(
1270 &self,
1271 _agent_id: AgentId,
1272 _query_embedding: Vec<f32>,
1273 _limit: usize,
1274 _threshold: f32,
1275 ) -> Result<Vec<ContextItem>, ContextError> {
1276 Ok(Vec::new())
1277 }
1278
1279 async fn advanced_search(
1280 &self,
1281 _agent_id: AgentId,
1282 _query_embedding: Vec<f32>,
1283 _filters: HashMap<String, String>,
1284 _limit: usize,
1285 _threshold: f32,
1286 ) -> Result<Vec<VectorSearchResult>, ContextError> {
1287 Ok(Vec::new())
1288 }
1289
1290 async fn delete_knowledge_item(&self, _vector_id: VectorId) -> Result<(), ContextError> {
1291 Ok(())
1292 }
1293
1294 async fn batch_delete(&self, _vector_ids: Vec<VectorId>) -> Result<(), ContextError> {
1295 Ok(())
1296 }
1297
1298 async fn update_metadata(
1299 &self,
1300 _vector_id: VectorId,
1301 _metadata: HashMap<String, Value>,
1302 ) -> Result<(), ContextError> {
1303 Ok(())
1304 }
1305
1306 async fn get_stats(&self) -> Result<VectorDatabaseStats, ContextError> {
1307 Ok(VectorDatabaseStats {
1308 total_vectors: 0,
1309 collection_size_bytes: 0,
1310 avg_query_time_ms: 0.0,
1311 })
1312 }
1313
1314 async fn create_index(&self, _field_name: &str) -> Result<(), ContextError> {
1315 Ok(())
1316 }
1317
1318 async fn optimize_collection(&self) -> Result<(), ContextError> {
1319 Ok(())
1320 }
1321}
1322
1323#[async_trait]
1324impl super::vector_db_trait::VectorDb for NoOpVectorDatabase {
1325 async fn initialize(&self) -> Result<(), ContextError> {
1326 Ok(())
1327 }
1328 async fn store_knowledge_item(
1329 &self,
1330 _item: &KnowledgeItem,
1331 _embedding: Vec<f32>,
1332 ) -> Result<VectorId, ContextError> {
1333 Ok(VectorId::new())
1334 }
1335 async fn store_memory_item(
1336 &self,
1337 _agent_id: AgentId,
1338 _memory: &MemoryItem,
1339 _embedding: Vec<f32>,
1340 ) -> Result<VectorId, ContextError> {
1341 Ok(VectorId::new())
1342 }
1343 async fn batch_store(
1344 &self,
1345 batch: VectorBatchOperation,
1346 ) -> Result<Vec<VectorId>, ContextError> {
1347 Ok(batch.items.iter().map(|_| VectorId::new()).collect())
1348 }
1349 async fn search_knowledge_base(
1350 &self,
1351 _agent_id: AgentId,
1352 _query_embedding: Vec<f32>,
1353 _limit: usize,
1354 ) -> Result<Vec<KnowledgeItem>, ContextError> {
1355 Ok(Vec::new())
1356 }
1357 async fn semantic_search(
1358 &self,
1359 _agent_id: AgentId,
1360 _query_embedding: Vec<f32>,
1361 _limit: usize,
1362 _threshold: f32,
1363 ) -> Result<Vec<ContextItem>, ContextError> {
1364 Ok(Vec::new())
1365 }
1366 async fn advanced_search(
1367 &self,
1368 _agent_id: AgentId,
1369 _query_embedding: Vec<f32>,
1370 _filters: HashMap<String, String>,
1371 _limit: usize,
1372 _threshold: f32,
1373 ) -> Result<Vec<VectorSearchResult>, ContextError> {
1374 Ok(Vec::new())
1375 }
1376 async fn delete_knowledge_item(&self, _vector_id: VectorId) -> Result<(), ContextError> {
1377 Ok(())
1378 }
1379 async fn batch_delete(&self, _vector_ids: Vec<VectorId>) -> Result<(), ContextError> {
1380 Ok(())
1381 }
1382 async fn update_metadata(
1383 &self,
1384 _vector_id: VectorId,
1385 _metadata: HashMap<String, Value>,
1386 ) -> Result<(), ContextError> {
1387 Ok(())
1388 }
1389 async fn get_stats(&self) -> Result<VectorDatabaseStats, ContextError> {
1390 Ok(VectorDatabaseStats {
1391 total_vectors: 0,
1392 collection_size_bytes: 0,
1393 avg_query_time_ms: 0.0,
1394 })
1395 }
1396 async fn create_index(&self, _field_name: &str) -> Result<(), ContextError> {
1397 Ok(())
1398 }
1399 async fn optimize_collection(&self) -> Result<(), ContextError> {
1400 Ok(())
1401 }
1402 async fn health_check(&self) -> Result<bool, ContextError> {
1403 Ok(true)
1404 }
1405}
1406
1407#[async_trait]
1409pub trait EmbeddingService: Send + Sync {
1410 async fn generate_embedding(&self, text: &str) -> Result<Vec<f32>, ContextError>;
1412
1413 async fn generate_batch_embeddings(
1415 &self,
1416 texts: Vec<&str>,
1417 ) -> Result<Vec<Vec<f32>>, ContextError>;
1418
1419 fn embedding_dimension(&self) -> usize;
1421
1422 fn max_text_length(&self) -> usize;
1424}
1425
1426pub struct MockEmbeddingService {
1428 dimension: usize,
1429}
1430
1431impl MockEmbeddingService {
1432 pub fn new(dimension: usize) -> Self {
1433 Self { dimension }
1434 }
1435}
1436
1437#[async_trait]
1438impl EmbeddingService for MockEmbeddingService {
1439 async fn generate_embedding(&self, text: &str) -> Result<Vec<f32>, ContextError> {
1440 let mut embedding = vec![0.0; self.dimension];
1442 let text_bytes = text.as_bytes();
1443
1444 for (i, val) in embedding.iter_mut().enumerate() {
1445 let byte_index = i % text_bytes.len();
1446 let byte_val = text_bytes.get(byte_index).unwrap_or(&0);
1447 *val = (*byte_val as f32 / 255.0) * 2.0 - 1.0; }
1449
1450 let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
1452 if magnitude > 0.0 {
1453 for val in &mut embedding {
1454 *val /= magnitude;
1455 }
1456 }
1457
1458 Ok(embedding)
1459 }
1460
1461 async fn generate_batch_embeddings(
1462 &self,
1463 texts: Vec<&str>,
1464 ) -> Result<Vec<Vec<f32>>, ContextError> {
1465 let mut embeddings = Vec::new();
1466 for text in texts {
1467 embeddings.push(self.generate_embedding(text).await?);
1468 }
1469 Ok(embeddings)
1470 }
1471
1472 fn embedding_dimension(&self) -> usize {
1473 self.dimension
1474 }
1475
1476 fn max_text_length(&self) -> usize {
1477 8192 }
1479}
1480
1481pub struct TfIdfEmbeddingService {
1483 dimension: usize,
1484 vocabulary: Arc<RwLock<HashMap<String, usize>>>,
1485 idf_scores: Arc<RwLock<HashMap<String, f32>>>,
1486}
1487
1488impl TfIdfEmbeddingService {
1489 pub fn new(dimension: usize) -> Self {
1490 Self {
1491 dimension,
1492 vocabulary: Arc::new(RwLock::new(HashMap::new())),
1493 idf_scores: Arc::new(RwLock::new(HashMap::new())),
1494 }
1495 }
1496
1497 pub async fn build_vocabulary(&self, documents: Vec<&str>) -> Result<(), ContextError> {
1499 let mut vocab = self.vocabulary.write().await;
1500 let mut doc_frequencies = HashMap::new();
1501 let total_docs = documents.len() as f32;
1502
1503 for doc in &documents {
1505 let words: std::collections::HashSet<String> = doc
1506 .to_lowercase()
1507 .split_whitespace()
1508 .map(|s| s.trim_matches(|c: char| !c.is_alphanumeric()))
1509 .filter(|s| !s.is_empty())
1510 .map(|s| s.to_string())
1511 .collect();
1512
1513 for word in words {
1514 let vocab_len = vocab.len();
1515 vocab.entry(word.clone()).or_insert(vocab_len);
1516 *doc_frequencies.entry(word).or_insert(0) += 1;
1517 }
1518 }
1519
1520 let mut idf_scores = self.idf_scores.write().await;
1522 for (word, doc_freq) in doc_frequencies {
1523 let idf = (total_docs / doc_freq as f32).ln();
1524 idf_scores.insert(word, idf);
1525 }
1526
1527 Ok(())
1528 }
1529
1530 fn tokenize(&self, text: &str) -> Vec<String> {
1531 text.to_lowercase()
1532 .split_whitespace()
1533 .map(|s| s.trim_matches(|c: char| !c.is_alphanumeric()))
1534 .filter(|s| !s.is_empty())
1535 .map(|s| s.to_string())
1536 .collect()
1537 }
1538}
1539
1540#[async_trait]
1541impl EmbeddingService for TfIdfEmbeddingService {
1542 async fn generate_embedding(&self, text: &str) -> Result<Vec<f32>, ContextError> {
1543 let vocab = self.vocabulary.read().await;
1544 let idf_scores = self.idf_scores.read().await;
1545
1546 let mut embedding = vec![0.0; self.dimension];
1547 let tokens = self.tokenize(text);
1548 let total_tokens = tokens.len() as f32;
1549
1550 if total_tokens == 0.0 {
1551 return Ok(embedding);
1552 }
1553
1554 let mut tf_counts = HashMap::new();
1556 for token in &tokens {
1557 *tf_counts.entry(token.clone()).or_insert(0) += 1;
1558 }
1559
1560 for (token, count) in tf_counts {
1562 if let Some(&vocab_index) = vocab.get(&token) {
1563 if let Some(&idf) = idf_scores.get(&token) {
1564 let tf = count as f32 / total_tokens;
1565 let tfidf = tf * idf;
1566
1567 let embedding_index = vocab_index % self.dimension;
1569 embedding[embedding_index] += tfidf;
1570 }
1571 }
1572 }
1573
1574 let magnitude: f32 = embedding.iter().map(|x| x * x).sum::<f32>().sqrt();
1576 if magnitude > 0.0 {
1577 for val in &mut embedding {
1578 *val /= magnitude;
1579 }
1580 }
1581
1582 Ok(embedding)
1583 }
1584
1585 async fn generate_batch_embeddings(
1586 &self,
1587 texts: Vec<&str>,
1588 ) -> Result<Vec<Vec<f32>>, ContextError> {
1589 let mut embeddings = Vec::new();
1590 for text in texts {
1591 embeddings.push(self.generate_embedding(text).await?);
1592 }
1593 Ok(embeddings)
1594 }
1595
1596 fn embedding_dimension(&self) -> usize {
1597 self.dimension
1598 }
1599
1600 fn max_text_length(&self) -> usize {
1601 16384 }
1603}