engula_supervisor/
supervisor.rs

1// Copyright 2022 The Engula Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use engula_apis::*;
16use tonic::Request;
17
18use crate::{apis::supervisor_server::Supervisor as _, Error, Result, Server};
19
20#[derive(Clone)]
21pub struct Supervisor {
22    server: Server,
23}
24
25impl Default for Supervisor {
26    fn default() -> Self {
27        Self::new()
28    }
29}
30
31impl Supervisor {
32    pub fn new() -> Self {
33        Self {
34            server: Server::new(),
35        }
36    }
37
38    pub async fn database(&self, req: DatabaseRequest) -> Result<DatabaseResponse> {
39        let req = Request::new(req);
40        let res = self.server.database(req).await?;
41        Ok(res.into_inner())
42    }
43
44    async fn database_union(
45        &self,
46        req: database_request_union::Request,
47    ) -> Result<database_response_union::Response> {
48        let req = DatabaseRequest {
49            requests: vec![DatabaseRequestUnion { request: Some(req) }],
50        };
51        let mut res = self.database(req).await?;
52        res.responses
53            .pop()
54            .and_then(|x| x.response)
55            .ok_or_else(|| Error::internal("missing database response"))
56    }
57
58    pub async fn describe_database(&self, name: String) -> Result<DatabaseDesc> {
59        let req = DescribeDatabaseRequest { name };
60        let req = database_request_union::Request::DescribeDatabase(req);
61        let res = self.database_union(req).await?;
62        let desc = if let database_response_union::Response::DescribeDatabase(res) = res {
63            res.desc
64        } else {
65            None
66        };
67        desc.ok_or_else(|| Error::internal("missing database description"))
68    }
69
70    pub async fn collection(&self, req: CollectionRequest) -> Result<CollectionResponse> {
71        let req = Request::new(req);
72        let res = self.server.collection(req).await?;
73        Ok(res.into_inner())
74    }
75
76    async fn collection_union(
77        &self,
78        dbname: String,
79        req: collection_request_union::Request,
80    ) -> Result<collection_response_union::Response> {
81        let req = CollectionRequest {
82            dbname,
83            requests: vec![CollectionRequestUnion { request: Some(req) }],
84        };
85        let mut res = self.collection(req).await?;
86        res.responses
87            .pop()
88            .and_then(|x| x.response)
89            .ok_or_else(|| Error::internal("missing collection response"))
90    }
91
92    pub async fn describe_collection(
93        &self,
94        dbname: String,
95        coname: String,
96    ) -> Result<CollectionDesc> {
97        let req = DescribeCollectionRequest { name: coname };
98        let req = collection_request_union::Request::DescribeCollection(req);
99        let res = self.collection_union(dbname, req).await?;
100        let desc = if let collection_response_union::Response::DescribeCollection(res) = res {
101            res.desc
102        } else {
103            None
104        };
105        desc.ok_or_else(|| Error::internal("missing collection description"))
106    }
107}