engula_cooperator/
database.rs1use std::{collections::BTreeMap, sync::Arc};
16
17use engula_apis::*;
18use engula_supervisor::Supervisor;
19use tokio::sync::Mutex;
20
21use crate::{Collection, Result};
22
23#[derive(Clone)]
24pub struct Database {
25 inner: Arc<Mutex<Inner>>,
26}
27
28impl Database {
29 pub fn new(desc: DatabaseDesc, supervisor: Supervisor) -> Self {
30 let inner = Inner::new(desc, supervisor);
31 Self {
32 inner: Arc::new(Mutex::new(inner)),
33 }
34 }
35
36 pub async fn execute(&self, req: DatabaseTxnRequest) -> Result<DatabaseTxnResponse> {
37 let mut inner = self.inner.lock().await;
38 let mut res = DatabaseTxnResponse::default();
39 for coreq in req.requests {
40 let co = inner.collection(&coreq.name).await?;
41 let cores = co.execute(coreq).await?;
42 res.responses.push(cores);
43 }
44 Ok(res)
45 }
46}
47
48struct Inner {
49 sp: Supervisor,
50 desc: DatabaseDesc,
51 collections: BTreeMap<u64, Collection>,
52}
53
54impl Inner {
55 fn new(desc: DatabaseDesc, supervisor: Supervisor) -> Self {
56 Self {
57 sp: supervisor,
58 desc,
59 collections: BTreeMap::new(),
60 }
61 }
62
63 async fn collection(&mut self, name: &str) -> Result<Collection> {
64 let desc = self
65 .sp
66 .describe_collection(self.desc.name.clone(), name.to_owned())
67 .await?;
68 let co = self
69 .collections
70 .entry(desc.id)
71 .or_insert_with(Collection::new)
72 .clone();
73 Ok(co)
74 }
75}