1use std::collections::HashMap;
7use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
8use std::sync::Arc;
9use std::time::Instant;
10
11use manifoldb_core::{CollectionId, EntityId, Value};
12use manifoldb_vector::{Embedding, SearchResult, VectorData, VectorError};
13
14pub trait VectorIndexProvider: Send + Sync {
19 fn search(
33 &self,
34 index_name: &str,
35 query: &Embedding,
36 k: usize,
37 ef_search: Option<usize>,
38 ) -> Result<Vec<SearchResult>, VectorError>;
39
40 fn has_index(&self, index_name: &str) -> bool;
42
43 fn dimension(&self, index_name: &str) -> Option<usize>;
45}
46
47pub trait CollectionVectorProvider: Send + Sync {
54 fn upsert_vector(
64 &self,
65 collection_id: CollectionId,
66 entity_id: EntityId,
67 collection_name: &str,
68 vector_name: &str,
69 data: &VectorData,
70 ) -> Result<(), VectorError>;
71
72 fn delete_vector(
74 &self,
75 collection_id: CollectionId,
76 entity_id: EntityId,
77 collection_name: &str,
78 vector_name: &str,
79 ) -> Result<bool, VectorError>;
80
81 fn delete_entity_vectors(
83 &self,
84 collection_id: CollectionId,
85 entity_id: EntityId,
86 collection_name: &str,
87 ) -> Result<usize, VectorError>;
88
89 fn get_vector(
91 &self,
92 collection_id: CollectionId,
93 entity_id: EntityId,
94 vector_name: &str,
95 ) -> Result<Option<VectorData>, VectorError>;
96
97 fn get_all_vectors(
99 &self,
100 collection_id: CollectionId,
101 entity_id: EntityId,
102 ) -> Result<std::collections::HashMap<String, VectorData>, VectorError>;
103
104 fn search(
106 &self,
107 collection_name: &str,
108 vector_name: &str,
109 query: &Embedding,
110 k: usize,
111 ef_search: Option<usize>,
112 ) -> Result<Vec<SearchResult>, VectorError>;
113}
114
115use super::graph_accessor::{GraphAccessor, NullGraphAccessor};
116
117pub struct ExecutionContext {
128 parameters: HashMap<u32, Value>,
130 cancelled: AtomicBool,
132 stats: ExecutionStats,
134 config: ExecutionConfig,
136 graph: Arc<dyn GraphAccessor>,
138 vector_index_provider: Option<Arc<dyn VectorIndexProvider>>,
140 collection_vector_provider: Option<Arc<dyn CollectionVectorProvider>>,
142}
143
144impl ExecutionContext {
145 #[must_use]
149 pub fn new() -> Self {
150 Self {
151 parameters: HashMap::new(),
152 cancelled: AtomicBool::new(false),
153 stats: ExecutionStats::new(),
154 config: ExecutionConfig::default(),
155 graph: Arc::new(NullGraphAccessor),
156 vector_index_provider: None,
157 collection_vector_provider: None,
158 }
159 }
160
161 #[must_use]
163 pub fn with_parameters(parameters: HashMap<u32, Value>) -> Self {
164 Self {
165 parameters,
166 cancelled: AtomicBool::new(false),
167 stats: ExecutionStats::new(),
168 config: ExecutionConfig::default(),
169 graph: Arc::new(NullGraphAccessor),
170 vector_index_provider: None,
171 collection_vector_provider: None,
172 }
173 }
174
175 #[must_use]
180 pub fn with_graph(mut self, graph: Arc<dyn GraphAccessor>) -> Self {
181 self.graph = graph;
182 self
183 }
184
185 #[inline]
187 #[must_use]
188 pub fn graph(&self) -> &dyn GraphAccessor {
189 self.graph.as_ref()
190 }
191
192 #[inline]
194 #[must_use]
195 pub fn graph_arc(&self) -> Arc<dyn GraphAccessor> {
196 Arc::clone(&self.graph)
197 }
198
199 #[must_use]
201 pub fn with_vector_index_provider(mut self, provider: Arc<dyn VectorIndexProvider>) -> Self {
202 self.vector_index_provider = Some(provider);
203 self
204 }
205
206 pub fn set_vector_index_provider(&mut self, provider: Arc<dyn VectorIndexProvider>) {
208 self.vector_index_provider = Some(provider);
209 }
210
211 #[must_use]
213 pub fn vector_index_provider(&self) -> Option<&dyn VectorIndexProvider> {
214 self.vector_index_provider.as_deref()
215 }
216
217 #[must_use]
222 pub fn vector_index_provider_arc(&self) -> Option<Arc<dyn VectorIndexProvider>> {
223 self.vector_index_provider.clone()
224 }
225
226 #[must_use]
228 pub fn with_collection_vector_provider(
229 mut self,
230 provider: Arc<dyn CollectionVectorProvider>,
231 ) -> Self {
232 self.collection_vector_provider = Some(provider);
233 self
234 }
235
236 pub fn set_collection_vector_provider(&mut self, provider: Arc<dyn CollectionVectorProvider>) {
238 self.collection_vector_provider = Some(provider);
239 }
240
241 #[must_use]
243 pub fn collection_vector_provider(&self) -> Option<&dyn CollectionVectorProvider> {
244 self.collection_vector_provider.as_deref()
245 }
246
247 #[must_use]
249 pub fn collection_vector_provider_arc(&self) -> Option<Arc<dyn CollectionVectorProvider>> {
250 self.collection_vector_provider.clone()
251 }
252
253 pub fn set_parameter(&mut self, index: u32, value: Value) {
255 self.parameters.insert(index, value);
256 }
257
258 #[inline]
260 #[must_use]
261 pub fn get_parameter(&self, index: u32) -> Option<&Value> {
262 self.parameters.get(&index)
263 }
264
265 #[inline]
267 #[must_use]
268 pub fn parameters(&self) -> &HashMap<u32, Value> {
269 &self.parameters
270 }
271
272 #[inline]
274 pub fn cancel(&self) {
275 self.cancelled.store(true, Ordering::SeqCst);
276 }
277
278 #[inline]
280 #[must_use]
281 pub fn is_cancelled(&self) -> bool {
282 self.cancelled.load(Ordering::SeqCst)
283 }
284
285 #[inline]
287 #[must_use]
288 pub fn stats(&self) -> &ExecutionStats {
289 &self.stats
290 }
291
292 #[inline]
294 #[must_use]
295 pub fn config(&self) -> &ExecutionConfig {
296 &self.config
297 }
298
299 pub fn config_mut(&mut self) -> &mut ExecutionConfig {
301 &mut self.config
302 }
303
304 #[inline]
306 pub fn record_rows_read(&self, count: u64) {
307 self.stats.rows_read.fetch_add(count, Ordering::Relaxed);
308 }
309
310 #[inline]
312 pub fn record_rows_produced(&self, count: u64) {
313 self.stats.rows_produced.fetch_add(count, Ordering::Relaxed);
314 }
315
316 #[inline]
318 pub fn record_rows_filtered(&self, count: u64) {
319 self.stats.rows_filtered.fetch_add(count, Ordering::Relaxed);
320 }
321
322 #[must_use]
324 pub fn with_config(mut self, config: ExecutionConfig) -> Self {
325 self.config = config;
326 self
327 }
328
329 #[inline]
333 #[must_use]
334 pub fn max_rows_in_memory(&self) -> usize {
335 self.config.max_rows_in_memory
336 }
337}
338
339impl Default for ExecutionContext {
340 fn default() -> Self {
341 Self::new()
342 }
343}
344
345impl std::fmt::Debug for ExecutionContext {
346 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
347 f.debug_struct("ExecutionContext")
348 .field("parameters", &self.parameters)
349 .field("cancelled", &self.cancelled)
350 .field("stats", &self.stats)
351 .field("config", &self.config)
352 .field("graph", &"<GraphAccessor>")
353 .field("vector_index_provider", &self.vector_index_provider.is_some())
354 .finish_non_exhaustive()
355 }
356}
357
358#[derive(Debug)]
360pub struct ExecutionStats {
361 start_time: Instant,
363 rows_read: AtomicU64,
365 rows_produced: AtomicU64,
367 rows_filtered: AtomicU64,
369}
370
371impl ExecutionStats {
372 #[must_use]
374 pub fn new() -> Self {
375 Self {
376 start_time: Instant::now(),
377 rows_read: AtomicU64::new(0),
378 rows_produced: AtomicU64::new(0),
379 rows_filtered: AtomicU64::new(0),
380 }
381 }
382
383 #[inline]
385 #[must_use]
386 pub fn rows_read(&self) -> u64 {
387 self.rows_read.load(Ordering::Relaxed)
388 }
389
390 #[inline]
392 #[must_use]
393 pub fn rows_produced(&self) -> u64 {
394 self.rows_produced.load(Ordering::Relaxed)
395 }
396
397 #[inline]
399 #[must_use]
400 pub fn rows_filtered(&self) -> u64 {
401 self.rows_filtered.load(Ordering::Relaxed)
402 }
403
404 #[inline]
406 #[must_use]
407 pub fn elapsed(&self) -> std::time::Duration {
408 self.start_time.elapsed()
409 }
410}
411
412impl Default for ExecutionStats {
413 fn default() -> Self {
414 Self::new()
415 }
416}
417
418pub const DEFAULT_MAX_ROWS_IN_MEMORY: usize = 1_000_000;
420
421#[derive(Debug, Clone)]
423pub struct ExecutionConfig {
424 pub max_batch_size: usize,
426 pub collect_stats: bool,
428 pub memory_limit: usize,
430 pub max_rows_in_memory: usize,
439}
440
441impl ExecutionConfig {
442 #[must_use]
444 pub const fn new() -> Self {
445 Self {
446 max_batch_size: 1024,
447 collect_stats: false,
448 memory_limit: 0,
449 max_rows_in_memory: DEFAULT_MAX_ROWS_IN_MEMORY,
450 }
451 }
452
453 #[must_use]
455 pub const fn with_batch_size(mut self, size: usize) -> Self {
456 self.max_batch_size = size;
457 self
458 }
459
460 #[must_use]
462 pub const fn with_stats(mut self) -> Self {
463 self.collect_stats = true;
464 self
465 }
466
467 #[must_use]
469 pub const fn with_memory_limit(mut self, limit: usize) -> Self {
470 self.memory_limit = limit;
471 self
472 }
473
474 #[must_use]
479 pub const fn with_max_rows_in_memory(mut self, limit: usize) -> Self {
480 self.max_rows_in_memory = limit;
481 self
482 }
483}
484
485impl Default for ExecutionConfig {
486 fn default() -> Self {
487 Self::new()
488 }
489}
490
491#[derive(Debug, Clone)]
496pub struct CancellationToken {
497 cancelled: Arc<AtomicBool>,
498}
499
500impl CancellationToken {
501 #[must_use]
503 pub fn new() -> Self {
504 Self { cancelled: Arc::new(AtomicBool::new(false)) }
505 }
506
507 #[inline]
509 pub fn cancel(&self) {
510 self.cancelled.store(true, Ordering::SeqCst);
511 }
512
513 #[inline]
515 #[must_use]
516 pub fn is_cancelled(&self) -> bool {
517 self.cancelled.load(Ordering::SeqCst)
518 }
519}
520
521impl Default for CancellationToken {
522 fn default() -> Self {
523 Self::new()
524 }
525}
526
527#[cfg(test)]
528mod tests {
529 use super::*;
530
531 #[test]
532 fn context_parameters() {
533 let mut ctx = ExecutionContext::new();
534 ctx.set_parameter(1, Value::Int(42));
535 ctx.set_parameter(2, Value::from("hello"));
536
537 assert_eq!(ctx.get_parameter(1), Some(&Value::Int(42)));
538 assert_eq!(ctx.get_parameter(2), Some(&Value::from("hello")));
539 assert_eq!(ctx.get_parameter(3), None);
540 }
541
542 #[test]
543 fn context_cancellation() {
544 let ctx = ExecutionContext::new();
545 assert!(!ctx.is_cancelled());
546 ctx.cancel();
547 assert!(ctx.is_cancelled());
548 }
549
550 #[test]
551 fn context_stats() {
552 let ctx = ExecutionContext::new();
553 ctx.record_rows_read(100);
554 ctx.record_rows_produced(50);
555 ctx.record_rows_filtered(50);
556
557 assert_eq!(ctx.stats().rows_read(), 100);
558 assert_eq!(ctx.stats().rows_produced(), 50);
559 assert_eq!(ctx.stats().rows_filtered(), 50);
560 }
561
562 #[test]
563 fn cancellation_token() {
564 let token = CancellationToken::new();
565 assert!(!token.is_cancelled());
566
567 let token2 = token.clone();
568 token.cancel();
569
570 assert!(token.is_cancelled());
571 assert!(token2.is_cancelled());
572 }
573}