oasysdb 0.7.3

Hybrid vector store with SQL integration & multi-index support.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
use super::*;
use futures::stream::StreamExt;
use sqlx::any::install_default_drivers;
use sqlx::Acquire;
use std::sync::{Arc, Mutex};
use tokio::runtime::Runtime;
use url::Url;
use uuid::Uuid;

// Type aliases for better readability.
type DatabaseURL = String;
type IndexName = String;
type IndexFile = PathBuf;
type Index = Arc<Mutex<Box<dyn VectorIndex>>>;
type IndicesPool = Mutex<HashMap<IndexName, Index>>;

/// The vector database interface.
///
/// The database is responsible for managing:
/// - Data flow between the source database and the indices.
/// - High-level indices operation and management.
/// - Persistance, retrieval, and in-memory pool of vector indices.
pub struct Database {
    root: PathBuf,
    state: Mutex<DatabaseState>,
    pool: IndicesPool,
}

impl Database {
    /// Opens existing or creates a new vector database.
    /// - `root`: Root directory of the database.
    /// - `source_url`: URL to SQL database.
    ///
    /// This will attempt to restore the database state from the file first.
    /// If the file does not exist, it will create a new database.
    /// When creating a new database, a data source is required.
    ///
    /// Source URL examples:
    /// ```txt
    /// sqlite://sqlite.db
    /// mysql://user:password@localhost:3306/db
    /// postgresql://user:password@localhost:5432/db
    /// ```
    pub fn open(
        root: impl Into<PathBuf>,
        source_url: Option<impl Into<DatabaseURL>>,
    ) -> Result<Database, Error> {
        let root_dir: PathBuf = root.into();
        let indices_dir = root_dir.join("indices");
        if !indices_dir.try_exists()? {
            // Creating the indices directory will also create
            // the root directory if it doesn't exist.
            fs::create_dir_all(&indices_dir)?;
        }

        let tmp_dir = root_dir.join("tmp");
        if !tmp_dir.try_exists()? {
            fs::create_dir_all(&tmp_dir)?;
        }

        let state_file = root_dir.join("odbstate");
        let state = if state_file.try_exists()? {
            let mut state = DatabaseState::restore(&state_file)?;

            // If the source URL is provided, update the state.
            // This is useful in case the source URL has changed.
            if let Some(source) = source_url {
                state.with_source(source)?;
            }

            state
        } else {
            let source = source_url.ok_or_else(|| {
                let code = ErrorCode::MissingSource;
                let message = "Data source is required for a new database.";
                Error::new(code, message)
            })?;

            let indices = HashMap::new();
            let source = source.into();
            DatabaseState::validate_source(&source)?;

            // Persist the new state to the state file.
            let state = DatabaseState { source, indices };
            file::write_binary_file(tmp_dir, state_file, &state)?;
            state
        };

        install_default_drivers();

        let state = Mutex::new(state);
        let pool: IndicesPool = Mutex::new(HashMap::new());
        Ok(Self { root: root_dir, state, pool })
    }

    /// Creates a new index in the database asynchronously.
    /// - `name`: Name of the index.
    /// - `algorithm`: Indexing algorithm to use.
    /// - `config`: Index data source configuration.
    pub async fn async_create_index(
        &self,
        name: impl Into<IndexName>,
        algorithm: IndexAlgorithm,
        config: SourceConfig,
    ) -> Result<(), Error> {
        let index_name: IndexName = name.into();

        // Check if the index already exists in the database.
        if self.get_index_ref(&index_name).is_some() {
            let code = ErrorCode::RequestError;
            let message = format!("Index already exists: {index_name}.");
            return Err(Error::new(code, message));
        }

        // Query the source database for records.
        let query = config.to_query();
        let mut conn = self.state()?.async_connect().await?;
        let mut stream = sqlx::query(&query).fetch(conn.acquire().await?);

        // Process the rows from the query as records.
        let mut records = HashMap::new();
        while let Some(row) = stream.next().await {
            let row = row?;
            let (id, record) = config.to_record(&row)?;
            records.insert(id, record);
        }

        let index_file = {
            let uuid = Uuid::new_v4().to_string();
            self.indices_dir().join(uuid)
        };

        let mut index = algorithm.initialize()?;
        index.build(records)?;

        // Persist the index to a file.
        let tmp_dir = self.tmp_dir();
        algorithm.persist_index(tmp_dir, &index_file, index.as_ref())?;

        // Insert the index into the pool for easy access.
        {
            let mut pool = self.pool.lock()?;
            pool.insert(index_name.clone(), Arc::new(Mutex::new(index)));
        }

        // Update db state with the new index.
        // This closure is necessary to  make sure the lock is dropped
        // before persisting the state to the file.
        {
            let mut state = self.state.lock()?;
            let index_ref = IndexRef { algorithm, config, file: index_file };
            state.indices.insert(index_name, index_ref);
        }

        self.persist_state()?;
        Ok(())
    }

    /// Creates a new index in the database synchronously.
    /// - `name`: Name of the index.
    /// - `algorithm`: Indexing algorithm to use.
    /// - `config`: Index data source configuration.
    pub fn create_index(
        &self,
        name: impl Into<IndexName>,
        algorithm: IndexAlgorithm,
        config: SourceConfig,
    ) -> Result<(), Error> {
        let rt = Runtime::new()?;
        rt.block_on(self.async_create_index(name, algorithm, config))
    }

    /// Returns an index reference.
    /// - `name`: Index name.
    ///
    /// This method can be used to deserialize the index directly from
    /// the file and load it into memory as an index object.
    pub fn get_index_ref(&self, name: impl AsRef<str>) -> Option<IndexRef> {
        let state = self.state.lock().ok()?;
        let index_ref = state.indices.get(name.as_ref())?;
        Some(index_ref.to_owned())
    }

    /// Retrieves an index and returns it as a trait object.
    /// - `name`: Index name.
    ///
    /// This method will return the index from the pool if it exists.
    /// Otherwise, it will load the index from the file and store it
    /// in the pool for future access.
    pub fn get_index(&self, name: impl AsRef<str>) -> Option<Index> {
        let name = name.as_ref();
        let IndexRef { algorithm, file, .. } = self.get_index_ref(name)?;

        // If the index is already in the indices pool, return it.
        let mut pool = self.pool.lock().ok()?;
        if let Some(index) = pool.get(name).cloned() {
            return Some(index);
        }

        // Load the index from the file and store it in the pool.
        // Then, return the index as a trait object.
        let index = algorithm.load_index(file).ok()?;
        let index: Index = Arc::new(Mutex::new(index));
        pool.insert(name.into(), index.clone());
        Some(index)
    }

    /// Retrieves an index and returns it in a result.
    /// - `name`: Index name.
    pub fn try_get_index(&self, name: impl AsRef<str>) -> Result<Index, Error> {
        let name = name.as_ref();
        self.get_index(name).ok_or_else(|| {
            let code = ErrorCode::NotFound;
            let message = format!("Index not found in database: {name}.");
            Error::new(code, message)
        })
    }

    /// Updates the index with new records from the source asynchronously.
    /// - `name`: Index name.
    ///
    /// This method checks the index for the last inserted record and queries
    /// the source database for new records after that checkpoint. It then
    /// updates the index with the new records.
    pub async fn async_refresh_index(
        &self,
        name: impl AsRef<str>,
    ) -> Result<(), Error> {
        let name = name.as_ref();
        let index_ref = self.get_index_ref(name).ok_or_else(|| {
            let code = ErrorCode::NotFound;
            let message = format!("Index not found: {name}.");
            Error::new(code, message)
        })?;

        // Cloning is necessary here to avoid borrowing issues.
        let IndexRef { config, .. } = index_ref.to_owned();

        let (query, config) = {
            // We wrap the index lock in a closure to make sure it's dropped
            // before async functionalities are called.
            let index: Index = self.get_index(name).unwrap();
            let index = index.lock()?;
            let meta = index.metadata();
            let checkpoint = meta.last_inserted.unwrap_or_default();
            (config.to_query_after(&checkpoint), config)
        };

        let mut conn = self.state()?.async_connect().await?;
        let mut stream = sqlx::query(&query).fetch(conn.acquire().await?);

        // Process the rows from the database as records.
        let mut records = HashMap::new();
        while let Some(row) = stream.next().await {
            let row = row?;
            let (id, record) = config.to_record(&row)?;
            records.insert(id, record);
        }

        self.insert_into_index(name, records)
    }

    /// Updates the index with new records from the source synchronously.
    /// - `name`: Index name.
    pub fn refresh_index(&self, name: impl AsRef<str>) -> Result<(), Error> {
        let rt = Runtime::new()?;
        rt.block_on(self.async_refresh_index(name))
    }

    /// Searches the index for nearest neighbors.
    /// - `name`: Index name.
    /// - `query`: Query vector.
    /// - `k`: Number of nearest neighbors to return.
    /// - `filters`: SQL-like filters to apply.
    ///
    /// The performance of this method depends on the indexing
    /// algorithm used when creating the index. ANNS algorithms
    /// may not return the exact nearest neighbors but perform
    /// much faster than linear search.
    pub fn search_index(
        &self,
        name: impl AsRef<str>,
        query: impl Into<Vector>,
        k: usize,
        filters: impl Into<Filters>,
    ) -> Result<Vec<SearchResult>, Error> {
        // These 2 lines are necessary to avoid memory issues.
        let index: Index = self.try_get_index(name)?;
        let index = index.lock()?;
        index.search(query.into(), k, filters.into())
    }

    /// Inserts new records into the index.
    /// - `name`: Index name.
    /// - `records`: Records to insert.
    pub fn insert_into_index(
        &self,
        name: impl AsRef<str>,
        records: HashMap<RecordID, Record>,
    ) -> Result<(), Error> {
        let index: Index = self.try_get_index(name.as_ref())?;
        let mut index = index.lock()?;
        index.insert(records)?;
        self.persist_existing_index(name, index.as_ref())
    }

    /// Updates the index with new record data.
    /// - `name`: Index name.
    /// - `records`: Records to update.
    ///
    /// This method will replace the existing record data of the provided
    /// ID with the new record data. If the record doesn't exist in the
    /// index, it will be ignored.
    pub fn update_index(
        &self,
        name: impl AsRef<str>,
        records: HashMap<RecordID, Record>,
    ) -> Result<(), Error> {
        let index: Index = self.try_get_index(name.as_ref())?;
        let mut index = index.lock()?;
        index.update(records)?;
        self.persist_existing_index(name, index.as_ref())
    }

    /// Deletes records from the index.
    /// - `name`: Index name.
    /// - `ids`: List of record IDs to delete.
    pub fn delete_from_index(
        &self,
        name: impl AsRef<str>,
        ids: Vec<RecordID>,
    ) -> Result<(), Error> {
        let index: Index = self.try_get_index(name.as_ref())?;
        let mut index = index.lock()?;
        index.delete(ids)?;
        self.persist_existing_index(name, index.as_ref())
    }

    /// Deletes an index from the database.
    /// - `name`: Index name.
    ///
    /// This method will remove the index from the pool and delete
    /// the index file from the disk. Returns an error if the index
    /// doesn't exist in the database.
    pub fn delete_index(&self, name: impl AsRef<str>) -> Result<(), Error> {
        let name = name.as_ref();
        let index_ref = {
            let mut state = self.state.lock()?;
            state.indices.remove(name).ok_or_else(|| {
                let code = ErrorCode::NotFound;
                let message = format!("Index doesn't exist: {name}.");
                Error::new(code, message)
            })?
        };

        self.release_indices(vec![name])?;
        fs::remove_file(index_ref.file())?;
        self.persist_state()
    }

    /// Loads indices to the pool if they are not already loaded.
    /// - `names`: Names of the indices.
    pub fn load_indices(
        &self,
        names: Vec<impl AsRef<str>>,
    ) -> Result<(), Error> {
        let state = self.state()?;
        if names.iter().any(|name| !state.indices.contains_key(name.as_ref())) {
            let code = ErrorCode::NotFound;
            let message = "Some indices are not found in the database.";
            return Err(Error::new(code, message));
        }

        // Using the get_index method to avoid code duplication.
        for name in names {
            self.get_index(name);
        }

        Ok(())
    }

    /// Releases indices from the pool.
    /// - `names`: Names of the indices.
    ///
    /// This method can free up memory by removing indices from the pool.
    /// After the indices are released, when they need to be accessed again,
    /// they will be loaded from the file.
    ///
    /// Loading indices from the file might take some time. Therefore,
    /// it's recommended to keep the frequently used indices in the pool.
    pub fn release_indices(
        &self,
        names: Vec<impl AsRef<str>>,
    ) -> Result<(), Error> {
        let mut pool = self.pool.lock()?;
        for name in names {
            let name = name.as_ref();
            pool.remove(name);
        }

        Ok(())
    }

    /// Returns the state object of the database.
    pub fn state(&self) -> Result<DatabaseState, Error> {
        let state = self.state.lock()?;
        Ok(state.to_owned())
    }

    /// Persists the state of the database to the state file.
    ///
    /// This method requires a Mutex lock to be available.
    /// If the lock is not available, this method will be suspended.
    /// When running this method with other state lock, make sure
    /// to release the lock before calling this method.
    pub fn persist_state(&self) -> Result<(), Error> {
        file::write_binary_file(
            self.tmp_dir(),
            self.state_file(),
            &self.state()?,
        )
    }
}

// Write internal database methods here.
impl Database {
    /// Returns the file path where the state is stored.
    fn state_file(&self) -> PathBuf {
        self.root.join("odbstate")
    }

    /// Returns the directory where the indices are stored.
    fn indices_dir(&self) -> PathBuf {
        self.root.join("indices")
    }

    /// Returns the temporary directory path for the database.
    fn tmp_dir(&self) -> PathBuf {
        self.root.join("tmp")
    }

    /// Persists an existing index to its file.
    /// - `name`: Index name.
    /// - `index`: Index trait object.
    ///
    /// This method requires the reference to the index with the given
    /// name to exist in the database state. If the index doesn't exist,
    /// this method will return a not found error.
    fn persist_existing_index(
        &self,
        name: impl AsRef<str>,
        index: &dyn VectorIndex,
    ) -> Result<(), Error> {
        let name = name.as_ref();
        let IndexRef { algorithm, file, .. } =
            self.get_index_ref(name).ok_or_else(|| {
                let code = ErrorCode::NotFound;
                let message = format!("Index might not exists: {name}.");
                Error::new(code, message)
            })?;

        let tmp_dir = self.tmp_dir();
        algorithm.persist_index(tmp_dir, file, index)
    }
}

/// The state of the vector database.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DatabaseState {
    source: DatabaseURL,
    indices: HashMap<IndexName, IndexRef>,
}

impl DatabaseState {
    /// Restores the database state from a file.
    /// - `path`: Path to the state file.
    pub fn restore(path: impl AsRef<Path>) -> Result<DatabaseState, Error> {
        file::read_binary_file(path)
    }

    /// Updates the source URL of the database state.
    /// - `source`: New source URL.
    pub fn with_source(
        &mut self,
        source: impl Into<DatabaseURL>,
    ) -> Result<(), Error> {
        let source = source.into();
        Self::validate_source(&source)?;
        self.source = source;
        Ok(())
    }

    /// Connects to the source SQL database asynchronously.
    pub async fn async_connect(&self) -> Result<SourceConnection, Error> {
        Ok(SourceConnection::connect(&self.source).await?)
    }

    /// Connects to the source SQL database.
    pub fn connect(&self) -> Result<SourceConnection, Error> {
        let rt = Runtime::new()?;
        rt.block_on(self.async_connect())
    }

    /// Disconnects from the source SQL database asynchronously.
    /// - `conn`: Database connection.
    pub async fn async_disconnect(conn: SourceConnection) -> Result<(), Error> {
        Ok(conn.close().await?)
    }

    /// Disconnects from the source SQL database.
    /// - `conn`: Database connection.
    pub fn disconnect(conn: SourceConnection) -> Result<(), Error> {
        let rt = Runtime::new()?;
        rt.block_on(Self::async_disconnect(conn))
    }

    /// Returns the type of the source database:
    /// - sqlite
    /// - mysql
    /// - postgresql
    pub fn source_type(&self) -> SourceType {
        // We can safely unwrap here because
        // we have already validated the source URL.
        let url = self.source.parse::<Url>().unwrap();
        url.scheme().into()
    }

    /// Validates the data source URL.
    ///
    /// The source URL scheme must be one of:
    /// - sqlite
    /// - mysql
    /// - postgresql
    pub fn validate_source(url: impl Into<DatabaseURL>) -> Result<(), Error> {
        let url = url.into();
        let url = url.parse::<Url>().map_err(|_| {
            let code = ErrorCode::InvalidSource;
            let message = "Invalid database source URL.";
            Error::new(code, message)
        })?;

        let valid_schemes = ["sqlite", "mysql", "postgresql"];
        if !valid_schemes.contains(&url.scheme()) {
            let code = ErrorCode::InvalidSource;
            let message = format!(
                "Unsupported database scheme. Choose between: {}.",
                valid_schemes.join(", ")
            );

            return Err(Error::new(code, message));
        }

        Ok(())
    }
}

/// Details about the index and where it is stored.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IndexRef {
    config: SourceConfig,
    algorithm: IndexAlgorithm,
    file: IndexFile,
}

impl IndexRef {
    /// Returns the source configuration of the index.
    pub fn config(&self) -> &SourceConfig {
        &self.config
    }

    /// Returns the type of the indexing algorithm of the index.
    pub fn algorithm(&self) -> &IndexAlgorithm {
        &self.algorithm
    }

    /// Returns the file path where the index is stored.
    pub fn file(&self) -> &IndexFile {
        &self.file
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use sqlx::{Executor, Row};
    use std::env;
    use std::sync::MutexGuard;

    const TABLE: &str = "embeddings";
    const TEST_INDEX: &str = "test_index";

    #[test]
    fn test_database_open() {
        assert!(create_test_database().is_ok());
    }

    #[test]
    fn test_database_create_index() -> Result<(), Error> {
        let db = create_test_database()?;

        let index: Index = db.try_get_index(TEST_INDEX)?;
        let index = index.lock()?;
        let metadata = index.metadata();

        assert_eq!(index.len(), 100);
        assert_eq!(metadata.last_inserted, Some(RecordID(100)));
        Ok(())
    }

    #[test]
    fn test_database_refresh_index() -> Result<(), Error> {
        let db = create_test_database()?;
        let query = generate_insert_query(100, 10);

        let rt = Runtime::new()?;
        rt.block_on(db.async_execute_sql(query))?;

        db.refresh_index(TEST_INDEX).unwrap();

        let index: Index = db.try_get_index(TEST_INDEX)?;
        let index = index.lock()?;
        let metadata = index.metadata();

        assert_eq!(index.len(), 110);
        assert_eq!(metadata.last_inserted, Some(RecordID(110)));
        Ok(())
    }

    #[test]
    fn test_database_search_index_basic() {
        let db = create_test_database().unwrap();
        let results = db
            .search_index(TEST_INDEX, vec![0.0; 128], 5, Filters::NONE)
            .unwrap();

        assert_eq!(results.len(), 5);
        assert_eq!(results[0].id, RecordID(1));
        assert_eq!(results[0].distance, 0.0);
    }

    #[test]
    fn test_database_search_index_advanced() {
        let db = create_test_database().unwrap();
        let results = db
            .search_index(TEST_INDEX, vec![0.0; 128], 5, "data >= 1050")
            .unwrap();

        assert_eq!(results.len(), 5);
        assert_eq!(results[0].id, RecordID(51));
    }

    #[test]
    fn test_database_insert_into_index() -> Result<(), Error> {
        // Create sample records to insert.
        let id = RecordID(101);
        let vector = Vector::from(vec![100.0; 128]);
        let data = HashMap::from([(
            "number".to_string(),
            Some(DataValue::Integer(1100)),
        )]);

        let record = Record { vector, data };
        let records = HashMap::from([(id, record)]);

        let db = create_test_database()?;
        db.insert_into_index(TEST_INDEX, records)?;

        let index: Index = db.try_get_index(TEST_INDEX)?;
        let index = index.lock()?;
        assert_eq!(index.len(), 101);
        Ok(())
    }

    #[test]
    fn test_database_delete_from_index() -> Result<(), Error> {
        let db = create_test_database()?;
        let ids = vec![RecordID(1), RecordID(2)];
        db.delete_from_index(TEST_INDEX, ids)?;

        let index: Index = db.try_get_index(TEST_INDEX)?;
        let index = index.lock()?;
        assert_eq!(index.len(), 98);
        Ok(())
    }

    #[test]
    fn test_database_delete_index() {
        let db = create_test_database().unwrap();
        db.delete_index(TEST_INDEX).unwrap();

        let state = db.state().unwrap();
        assert!(!state.indices.contains_key(TEST_INDEX));
    }

    #[test]
    fn test_database_indices_pool() -> Result<(), Error> {
        let db = create_test_database()?;

        {
            db.release_indices(vec![TEST_INDEX])?;
            let pool = db.pool()?;
            assert!(!pool.contains_key(TEST_INDEX));
        }

        {
            db.load_indices(vec![TEST_INDEX])?;
            let pool = db.pool()?;
            assert!(pool.contains_key(TEST_INDEX));
        }

        Ok(())
    }

    fn create_test_database() -> Result<Database, Error> {
        let path = PathBuf::from("odb_test");
        if path.try_exists()? {
            fs::remove_dir_all(&path)?;
        }

        let db_path = get_tmp_dir()?.join("sqlite.db");
        let db_url = format!("sqlite://{}?mode=rwc", db_path.display());

        let mut db = Database::open(path, Some(db_url.to_owned()))?;
        let state = db.state()?;
        assert_eq!(state.source_type(), SourceType::SQLITE);

        let rt = Runtime::new()?;
        rt.block_on(setup_test_source(&db_url))?;

        create_test_index(&mut db)?;
        Ok(db)
    }

    fn create_test_index(db: &mut Database) -> Result<(), Error> {
        let algorithm = IndexAlgorithm::Flat(ParamsFlat::default());
        let config = SourceConfig::new(TABLE, "id", "vector")
            .with_metadata(vec!["data"]);

        db.create_index(TEST_INDEX, algorithm, config)?;

        let index_ref = db.get_index_ref(TEST_INDEX).unwrap();
        assert_eq!(index_ref.algorithm().name(), "FLAT");
        Ok(())
    }

    fn generate_insert_query(start: u8, count: u8) -> String {
        let start = start as u16;
        let end = start + count as u16;

        let mut values = vec![];
        for i in start..end {
            let vector = vec![i as f32; 128];
            let vector = serde_json::to_string(&vector).unwrap();
            let data = 1000 + i;
            values.push(format!("({vector:?}, {data})"));
        }

        let values = values.join(",\n");
        format!(
            "INSERT INTO {TABLE} (vector, data)
            VALUES {values}"
        )
    }

    pub fn get_tmp_dir() -> Result<PathBuf, Error> {
        let tmp_dir = env::temp_dir().join("oasysdb");
        if !tmp_dir.try_exists()? {
            fs::create_dir_all(&tmp_dir)?;
        }

        Ok(tmp_dir)
    }

    async fn setup_test_source(
        url: impl Into<DatabaseURL>,
    ) -> Result<(), Error> {
        let url = url.into();
        let mut conn = SourceConnection::connect(&url).await?;

        let create_table = format!(
            "CREATE TABLE IF NOT EXISTS {TABLE} (
                id INTEGER PRIMARY KEY,
                vector JSON NOT NULL,
                data INTEGER NOT NULL
            )"
        );

        let insert_records = generate_insert_query(0, 100);
        let drop_table = format!("DROP TABLE IF EXISTS {TABLE}");

        conn.execute(drop_table.as_str()).await?;
        conn.execute(create_table.as_str()).await?;
        conn.execute(insert_records.as_str()).await?;

        let count = {
            let query = format!("SELECT COUNT(*) FROM {TABLE}");
            conn.fetch_one(query.as_str()).await?.get::<i64, usize>(0)
        };

        assert_eq!(count, 100);
        Ok(())
    }

    impl Database {
        fn pool(&self) -> Result<MutexGuard<HashMap<IndexName, Index>>, Error> {
            Ok(self.pool.lock()?)
        }

        async fn async_execute_sql(
            &self,
            query: impl AsRef<str>,
        ) -> Result<(), Error> {
            let mut conn = self.state()?.async_connect().await?;
            conn.execute(query.as_ref()).await?;
            Ok(())
        }
    }
}