engula_cooperator/
database.rs

1// Copyright 2022 The Engula Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use std::{collections::BTreeMap, sync::Arc};
16
17use engula_apis::*;
18use engula_supervisor::Supervisor;
19use tokio::sync::Mutex;
20
21use crate::{Collection, Result};
22
23#[derive(Clone)]
24pub struct Database {
25    inner: Arc<Mutex<Inner>>,
26}
27
28impl Database {
29    pub fn new(desc: DatabaseDesc, supervisor: Supervisor) -> Self {
30        let inner = Inner::new(desc, supervisor);
31        Self {
32            inner: Arc::new(Mutex::new(inner)),
33        }
34    }
35
36    pub async fn execute(&self, req: DatabaseTxnRequest) -> Result<DatabaseTxnResponse> {
37        let mut inner = self.inner.lock().await;
38        let mut res = DatabaseTxnResponse::default();
39        for coreq in req.requests {
40            let co = inner.collection(&coreq.name).await?;
41            let cores = co.execute(coreq).await?;
42            res.responses.push(cores);
43        }
44        Ok(res)
45    }
46}
47
48struct Inner {
49    sp: Supervisor,
50    desc: DatabaseDesc,
51    collections: BTreeMap<u64, Collection>,
52}
53
54impl Inner {
55    fn new(desc: DatabaseDesc, supervisor: Supervisor) -> Self {
56        Self {
57            sp: supervisor,
58            desc,
59            collections: BTreeMap::new(),
60        }
61    }
62
63    async fn collection(&mut self, name: &str) -> Result<Collection> {
64        let desc = self
65            .sp
66            .describe_collection(self.desc.name.clone(), name.to_owned())
67            .await?;
68        let co = self
69            .collections
70            .entry(desc.id)
71            .or_insert_with(Collection::new)
72            .clone();
73        Ok(co)
74    }
75}