lbasedb 0.1.10

Low level DBMS in Rust focusing on datasets.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
//! `Conn` is a basic structure for the connection that provides the full
//! interface to the DBMS.

use std::sync::Arc;
use std::collections::HashMap;

use tokio::io::{Result as TokioResult};
use tokio::task::JoinSet;
use tokio::fs::{create_dir_all, remove_dir_all, rename};
use tokio::sync::{Mutex, RwLock};

use crate::validate;
use crate::path_concat;
use crate::seq::Seq;
use crate::list::List;
use crate::items::{FeedItem, ColItem};
use crate::datatype::Dataunit;
use crate::dataset::{Dataset, get_dataset_size};


/// Connection object that manages all the entities. Since it interacts with 
/// the file system and supports asynchronous interface, there is no need 
/// to use it in a multi threading way.
pub struct Conn {
    // Path to the directory where the data and settings are stored
    path: String,

    // Feed list object to manage the feeds options
    feed_list: RwLock<List<FeedItem, String>>,

    // Feed mapping feed key -> feed
    feed_map: RwLock<HashMap<String, FeedItem>>,

    // Col list objects that is a mapping feed key -> the list
    col_list_mapping: RwLock<HashMap<String, List<ColItem, String>>>,

    // Col mapping as double map feed key -> col key -> col
    col_map_mapping: RwLock<HashMap<String, HashMap<String, ColItem>>>,

    // Seq mapping as double map feed key -> col key -> seq
    seq_mapping: RwLock<HashMap<String, HashMap<String, Arc<Mutex<Seq>>>>>,
}


impl Conn {
    /// Create a connection giving the path to the directory to store the data.
    /// If the path does not exist, the directory will be created.
    pub async fn new(path: &str) -> TokioResult<Self> {
        // Ensure the directory
        create_dir_all(path).await?;

        // List of feeds
        let feed_list = List::<FeedItem, String>::new(
            Self::_get_feed_list_path(path)
        ).await?;

        // Create instance
        let instance = Self {
            path: path.to_string(),
            feed_list: RwLock::new(feed_list),
            feed_map: RwLock::new(HashMap::new()),
            col_list_mapping: RwLock::new(HashMap::new()),
            col_map_mapping: RwLock::new(HashMap::new()),
            seq_mapping: RwLock::new(HashMap::new()),
        };

        // Open all feeds
        let feed_map = instance.feed_list.write().await.map().await?;
        for (feed_name, feed_item) in feed_map.into_iter() {
            instance._feed_open(&feed_name, feed_item).await?;
        }

        Ok(instance)
    }

    /// Path to the data
    pub fn path(&self) -> String {
        self.path.clone()
    }

    /// List the feeds.
    pub async fn feed_list(&self) -> Vec<FeedItem> {
        self.feed_map.read().await.values().cloned().collect()
    }

    /// Check if the feed exists.
    pub async fn feed_exists(&self, feed_name: &str) -> bool {
        self.feed_map.read().await.contains_key(feed_name)
    }

    /// Add a new feed by its name.
    pub async fn feed_add(&self, feed_name: &str) -> TokioResult<()> {
        // Check whether it exists
        validate!(!self.feed_exists(feed_name).await, 
                  AlreadyExists, feed_name)?;

        // Try to create a feed instance
        let feed_item = FeedItem::new(feed_name)?;

        // Create directory for the feed
        let feed_path = path_concat!(self.path.clone(), feed_name);
        create_dir_all(feed_path).await?;

        // Insert a new record into the list
        self.feed_list.write().await.add(&feed_item).await?;

        // Open the feed
        self._feed_open(feed_name, feed_item).await?;

        // Ok
        Ok(())
    }

    /// Remove the feed by its name.
    pub async fn feed_remove(&self, feed_name: &str) -> TokioResult<()> {
        // Check whether it exists
        validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;

        // Close the feed
        self._feed_close(feed_name).await;

        // Remove from the list
        self.feed_list.write().await.remove(&feed_name.to_string()).await?;

        // Remove the directory
        let feed_path = path_concat!(self.path.clone(), feed_name);
        remove_dir_all(feed_path).await?;

        // Ok
        Ok(())
    }

    /// Rename the feed.
    pub async fn feed_rename(&self, name: &str, name_new: &str) -> 
                             TokioResult<()> {
        // Check whether they exist
        validate!(self.feed_exists(name).await, NotFound, name)?;
        validate!(!self.feed_exists(name_new).await, AlreadyExists, name_new)?;

        // Close the feed
        let mut feed_item = self._feed_close(name).await;

        // Run update
        let res: TokioResult<()> = {
            // Update feed list
            feed_item.rename(name_new)?;
            self.feed_list.write().await
                .modify(&name.to_string(), &feed_item).await?;

            // Rename the directory
            let feed_path = path_concat!(self.path.clone(), name);
            let feed_path_new = path_concat!(self.path.clone(), name_new);
            rename(feed_path, feed_path_new).await?;

            // Ok
            Ok(())
        };

        // Open the feed
        self._feed_open(name_new, feed_item).await?;

        // Raise error if happened
        res?;

        // Ok
        Ok(())
    }

    /// List columns of the feed.
    pub async fn col_list(&self, feed_name: &str) -> TokioResult<Vec<ColItem>> {
        // Check whether the feed exists
        validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;

        // Collect columns to return
        Ok(self.col_map_mapping.read().await[feed_name]
            .values().cloned().collect())
    }

    /// Check if the column exists in the feed.
    pub async fn col_exists(&self, feed_name: &str, 
                            col_name: &str) -> TokioResult<bool> {
        // Check whether the feed exists
        validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;

        // Check whether the column exists
        Ok(self.col_map_mapping.read().await[feed_name].contains_key(col_name))
    }

    /// Rename the column
    pub async fn col_rename(&self, feed_name: &str, name: &str, 
                            name_new: &str) -> TokioResult<()> {
        // Check whether the feed exists
        validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;

        // Check whether the column exists
        validate!(self.col_exists(feed_name, name).await?, NotFound, name)?;
        validate!(!self.col_exists(feed_name, name_new).await?, 
                  AlreadyExists, name_new)?;

        // Close the col
        let mut col_item = self._col_close(feed_name, name).await;

        // Run update
        let res: TokioResult<()> = {
            // Update col list
            col_item.rename(name_new)?;
            self.col_list_mapping.write().await.get_mut(feed_name).unwrap()
                .modify(&name.to_string(), &col_item).await?;

            // Rename the seq file
            let seq_path = Self::_get_seq_path(&self.path, feed_name, name);
            let seq_path_new = Self::_get_seq_path(&self.path, feed_name, 
                                                   name_new);
            rename(seq_path, seq_path_new.clone()).await?;

            // Ok
            Ok(())
        };

        // Open the col
        self._col_open(feed_name, name_new, col_item).await?;

        // Raise error if happened
        res?;

        // Ok
        Ok(())
    }

    /// Add a new column by its name and datatype.
    pub async fn col_add(&self, feed_name: &str, col_name: &str, 
                         datatype: &str) -> TokioResult<()> {
        // Check whether the feed exists
        validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;

        // Check whether the column exists
        validate!(!self.col_exists(feed_name, col_name).await?, 
                  AlreadyExists, col_name)?;

        // Create col item
        let col_item = ColItem::new(col_name, datatype)?;

        // Add col item in the list
        self.col_list_mapping.write().await.get_mut(feed_name).unwrap()
            .add(&col_item).await?;

        // Open the col
        self._col_open(feed_name, col_name, col_item).await?;

        // Resize the seq
        let size = self.feed_map.read().await[feed_name].size;
        let seq = &self.seq_mapping.read().await[feed_name][col_name];
        seq.lock().await.resize(size).await?;

        // Ok
        Ok(())
    }

    /// Remove the column.
    pub async fn col_remove(&self, feed_name: &str, col_name: &str) -> 
                            TokioResult<()> {
        // Check whether the feed exists
        validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;

        // Check whether the column exists
        validate!(self.col_exists(feed_name, col_name).await?, 
                  NotFound, col_name)?;

        // Close the col
        self._col_close(feed_name, col_name).await;

        // Remove col item from the list
        self.col_list_mapping.write().await.get_mut(feed_name).unwrap()
            .remove(&col_name.to_string()).await?;

        // Remove seq file
        let seq_path = Self::_get_seq_path(&self.path, feed_name, col_name);
        tokio::fs::remove_file(seq_path).await?;

        // Ok
        Ok(())
    }

    /// Get the size of the feed.
    pub async fn size_get(&self, feed_name: &str) -> TokioResult<usize> {
        // Check whether the feed exists
        validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;

        // Get size
        Ok(self.feed_map.read().await[feed_name].size)
    }

    /// Change the size of the feed including the sizes of all column files.
    pub async fn size_set(&self, feed_name: &str, size: usize) -> 
                          TokioResult<usize> {
        // Check whether the feed exists
        validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;

        // Resize all seq
        let mut js = JoinSet::new();
        for seq in self.seq_mapping.read().await[feed_name].values() {
            let seq_clone = Arc::clone(seq);
            js.spawn(async move {
                seq_clone.lock().await.resize(size).await
            });
        }
        js.join_all().await;

        // Change the size
        let mut feed_map = self.feed_map.write().await;
        let feed_item = feed_map.get_mut(feed_name).unwrap();
        let old_size = feed_item.size;
        feed_item.size = size;
        self.feed_list.write().await
            .modify(&feed_name.to_string(), feed_item).await?;

        // Return
        Ok(old_size)
    }

    /// Get dataset stored in the feed `feed_name`, having the size `size`
    /// and the columns `cols` with the offset `ix`.
    pub async fn data_get(&self, feed_name: &str, ix: usize, size: usize, 
                          cols: &[String]) -> TokioResult<Dataset> {
        // Check whether the feed exists
        validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;

        // Validate range
        validate!(ix + size <= self.feed_map.read().await[feed_name].size, 
                  UnexpectedEof, (ix + size).to_string())?;

        // Create a JoinSet object
        let mut js = JoinSet::new();

        for col_name in cols.iter() {
            // Check whether the column exists
            validate!(self.col_exists(feed_name, col_name).await?, 
                      NotFound, &**col_name)?;

            // Get datatype from col item
            let datatype = self.col_map_mapping.read().await
                [feed_name][col_name].datatype.clone();

            // Get seq object
            let seq = &self.seq_mapping.read().await[feed_name][col_name];

            // Clone the seq
            let seq_clone = Arc::clone(seq);

            // Clone col_name
            let col_name_clone = col_name.clone();

            // Spawn a concurrent task
            js.spawn(async move {
                let mut block = vec![0u8; size * datatype.size()];
                seq_clone.lock().await.get(ix, &mut block).await.unwrap();
                (block, datatype, col_name_clone)
            });
        }

        // Create an empty dataset
        let mut ds = HashMap::new();

        while let Some(res) = js.join_next().await {
            // Get block
            let (block, datatype, col_name) = res?;

            // Convert bytes to a dataset series
            let series = block.chunks(datatype.size())
                .map(|chunk| datatype.from_bytes(chunk))
                .collect::<Vec<Dataunit>>();

            // Insert series into the dataset
            ds.insert(col_name, series);
        }

        Ok(ds)
    }

    /// Push the dataset to the feed. The missed columns will be zeros.
    pub async fn data_push(&self, feed_name: &str, ds: &Dataset) -> 
                           TokioResult<()> {
        // Check whether the feed exists
        validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;

        // Get the dataset size
        let size = get_dataset_size(ds)?;

        // If the dataset is not empty
        if size > 0 {
            // Get the current feed size into ix
            let ix = self.feed_map.read().await[feed_name].size;

            // Update the size of all cols
            self.size_set(feed_name, ix + size).await?;

            // Insert the data from the dataset
            self.data_patch(feed_name, ix, ds).await?;
        }

        Ok(())
    }

    /// Update the records in the feed with the given dataset. The missing
    /// columns will be filled with zeros. For preventing it use `data_patch`
    /// instead.
    pub async fn data_save(&self, feed_name: &str, ix: usize, 
                           ds: &Dataset) -> TokioResult<()> {
        // Check whether the feed exists
        validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;

        // Get all columns
        let cols = self.col_map_mapping.read().await[feed_name]
            .keys().cloned().collect::<Vec<String>>();

        // Update the columns with the dataset
        self._data_update(feed_name, ix, ds, &cols).await?;

        // Ok
        Ok(())
    }

    /// Update the records in the feed with the given dataset. The missing
    /// columns will no change. For making them zero use `data_save`
    /// instead.
    pub async fn data_patch(&self, feed_name: &str, ix: usize, 
                            ds: &Dataset) -> TokioResult<()> {
        // Check whether the feed exists
        validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;

        // Get dataset columns
        let cols = ds.keys().cloned().collect::<Vec<String>>();

        // Update the columns with the dataset
        self._data_update(feed_name, ix, ds, &cols).await?;

        // Ok
        Ok(())
    }

    /// Get raw bytes having the size `size` (in data units) of the column 
    /// `col_name` in the feed `feed_name` with the offset `ix`.
    pub async fn raw_get(&self, feed_name: &str, col_name: &str, ix: usize, 
                         size: usize) -> TokioResult<Vec<u8>> {
        // Check whether the feed exists
        validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;

        // Check whether the column exists
        validate!(self.col_exists(feed_name, col_name).await?, 
                  NotFound, col_name)?;

        // Validate range
        validate!(ix + size <= self.feed_map.read().await[feed_name].size, 
                  UnexpectedEof, (ix + size).to_string())?;

        // Get seq object
        let seq = &self.seq_mapping.read().await[feed_name][col_name];

        // Get col item because we need the datatype
        let col_item = &self.col_map_mapping
            .read().await[feed_name][col_name];

        // Get bytes from the seq file into a buffer
        let mut block = vec![0u8; size * col_item.datatype.size()];
        seq.lock().await.get(ix, &mut block).await?;

        Ok(block)
    }

    /// Update raw bytes from the `block` in the column `col_name` 
    /// of the feed `feed_name` with the offset `ix`.
    pub async fn raw_set(&self, feed_name: &str, col_name: &str, ix: usize, 
                         block: &[u8]) -> TokioResult<()> {
        // Check whether the feed exists
        validate!(self.feed_exists(feed_name).await, NotFound, feed_name)?;

        // Check whether the column exists
        validate!(self.col_exists(feed_name, col_name).await?, 
                  NotFound, col_name)?;

        // Get seq object
        let seq_arc = &self.seq_mapping.read().await[feed_name][col_name];
        let mut seq = seq_arc.lock().await;

        // Validate range
        let end = ix + block.len() / seq.block_size();
        validate!(end <= self.feed_map.read().await[feed_name].size, 
                  UnexpectedEof, end.to_string())?;

        // Update the seq file with the block
        seq.update(ix, block).await?;

        // Ok
        Ok(())
    }

    async fn _data_update(&self, feed_name: &str, ix: usize, ds: &Dataset, 
                          cols: &[String]) -> TokioResult<()> {
        // Get dataset size, it also check where the dataset is valid: 
        // all series have the same size
        let size = get_dataset_size(ds)?;

        // Validate range
        validate!(ix + size <= self.feed_map.read().await[feed_name].size, 
                  UnexpectedEof, (ix + size).to_string())?;

        // If the dataset is not empty
        if size > 0 {
            // Create a join set
            let mut js = JoinSet::new();

            // Iterate the colunms
            for col_name in cols.iter() {
                // Get col item because we need the datatype
                if let Some(col_item) = &self.col_map_mapping.read()
                                             .await[feed_name].get(col_name) {
                    // Convert the series into a byte sequence
                    let block = if let Some(series) = ds.get(col_name) {
                        series.iter().map(
                            |unit| col_item.datatype.to_bytes(unit).unwrap()
                        ).collect::<Vec<Vec<u8>>>().concat()
                    } else {
                        vec![0u8; size * col_item.datatype.size()]
                    };

                    // Get seq object
                    let seq = &self.seq_mapping.read()
                                   .await[feed_name][col_name];

                    // Clone the seq
                    let seq_clone = Arc::clone(seq);

                    // Update the seq file with the block in parralel
                    js.spawn(async move {
                        seq_clone.lock().await.update(ix, &block).await
                    });
                }
            }

            // Execute in parralel
            js.join_all().await;
        }

        // Ok
        Ok(())
    }

    async fn _feed_open(&self, feed_name: &str, feed_item: FeedItem) -> 
                        TokioResult<()> {
        // Open col list file
        let col_list_path = Self::_get_col_list_path(&self.path, feed_name);
        let mut col_list = List::<ColItem, String>::new(col_list_path).await?;
        let col_map = col_list.map().await?;

        // Open all seq files
        self.col_map_mapping.write().await
            .insert(feed_name.to_string(), HashMap::new());
        self.seq_mapping.write().await
            .insert(feed_name.to_string(), HashMap::new());
        for (col_name, col_item) in col_map.into_iter() {
            self._col_open(feed_name, &col_name, col_item).await?;
        }

        // Update mappings
        self.feed_map.write().await.insert(feed_name.to_string(), feed_item);
        self.col_list_mapping.write().await
            .insert(feed_name.to_string(), col_list);
        
        // Ok
        Ok(())
    }

    async fn _feed_close(&self, feed_name: &str) -> FeedItem {
        // Close all seq files by removing them from seq_mapping
        self.seq_mapping.write().await.remove(feed_name);

        // Close col list file by removing it from col_list_mapping
        self.col_list_mapping.write().await.remove(feed_name);
        self.col_map_mapping.write().await.remove(feed_name);

        // Update feed list
        self.feed_map.write().await.remove(feed_name).unwrap()
    }

    async fn _col_open(&self, feed_name: &str, col_name: &str, 
                       col_item: ColItem) -> TokioResult<()> {
        // Create a seq for the col and set the necessary size
        let seq_path = Self::_get_seq_path(&self.path, feed_name, col_name);
        let seq = Seq::new(seq_path, col_item.datatype.size()).await?;

        // Update the mappings
        self.col_map_mapping.write().await.get_mut(feed_name).unwrap()
            .insert(col_name.to_string(), col_item);
        self.seq_mapping.write().await.get_mut(feed_name).unwrap()
            .insert(col_name.to_string(), Arc::new(Mutex::new(seq)));

        // Ok
        Ok(())
    }

    async fn _col_close(&self, feed_name: &str, col_name: &str) -> ColItem {
        // Close seq file by removing it from seq_mapping
        self.seq_mapping.write().await.get_mut(feed_name).unwrap()
            .remove(col_name);

        // Remove col item from col_map_mapping and return it
        self.col_map_mapping.write().await.get_mut(feed_name).unwrap()
            .remove(col_name).unwrap()
    }

    fn _get_feed_list_path(path: &str) -> String {
        path_concat!(path, "feed.list")
    }

    fn _get_col_list_path(path: &str, feed_name: &str) -> String {
        path_concat!(path, feed_name, "col.list")
    }

    fn _get_seq_path(path: &str, feed_name: &str, col_name: &str) -> String {
        path_concat!(path, feed_name, format!("{}.col", col_name))
    }
}