engula_supervisor/
server.rs

1// Copyright 2022 The Engula Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15use engula_apis::*;
16use tonic::{Request, Response};
17
18use crate::{apis::*, Database, Error, Result, Universe};
19
20#[derive(Clone)]
21pub struct Server {
22    uv: Universe,
23}
24
25impl Default for Server {
26    fn default() -> Self {
27        Self::new()
28    }
29}
30
31impl Server {
32    pub fn new() -> Self {
33        Self {
34            uv: Universe::new(),
35        }
36    }
37
38    pub fn into_service(self) -> supervisor_server::SupervisorServer<Self> {
39        supervisor_server::SupervisorServer::new(self)
40    }
41}
42
43impl Server {
44    async fn handle_database(&self, req: DatabaseRequest) -> Result<DatabaseResponse> {
45        let mut res = DatabaseResponse::default();
46        for req_union in req.requests {
47            let res_union = self.handle_database_union(req_union).await?;
48            res.responses.push(res_union);
49        }
50        Ok(res)
51    }
52
53    async fn handle_database_union(
54        &self,
55        req: DatabaseRequestUnion,
56    ) -> Result<DatabaseResponseUnion> {
57        let req = req
58            .request
59            .ok_or_else(|| Error::invalid_argument("missing database request"))?;
60        let res = match req {
61            database_request_union::Request::ListDatabases(_) => {
62                todo!();
63            }
64            database_request_union::Request::CreateDatabase(req) => {
65                let res = self.handle_create_database(req).await?;
66                database_response_union::Response::CreateDatabase(res)
67            }
68            database_request_union::Request::UpdateDatabase(_) => {
69                todo!();
70            }
71            database_request_union::Request::DeleteDatabase(_) => {
72                todo!();
73            }
74            database_request_union::Request::DescribeDatabase(req) => {
75                let res = self.handle_describe_database(req).await?;
76                database_response_union::Response::DescribeDatabase(res)
77            }
78        };
79        Ok(DatabaseResponseUnion {
80            response: Some(res),
81        })
82    }
83
84    async fn handle_create_database(
85        &self,
86        req: CreateDatabaseRequest,
87    ) -> Result<CreateDatabaseResponse> {
88        let desc = req
89            .desc
90            .ok_or_else(|| Error::invalid_argument("missing database description"))?;
91        let desc = self.uv.create_database(desc).await?;
92        Ok(CreateDatabaseResponse { desc: Some(desc) })
93    }
94
95    async fn handle_describe_database(
96        &self,
97        req: DescribeDatabaseRequest,
98    ) -> Result<DescribeDatabaseResponse> {
99        let db = self.uv.database(&req.name).await?;
100        let desc = db.desc().await;
101        Ok(DescribeDatabaseResponse { desc: Some(desc) })
102    }
103
104    async fn handle_collection(&self, req: CollectionRequest) -> Result<CollectionResponse> {
105        let db = self.uv.database(&req.dbname).await?;
106        let mut res = CollectionResponse::default();
107        for req_union in req.requests {
108            let res_union = self.handle_collection_union(db.clone(), req_union).await?;
109            res.responses.push(res_union);
110        }
111        Ok(res)
112    }
113
114    async fn handle_collection_union(
115        &self,
116        db: Database,
117        req: CollectionRequestUnion,
118    ) -> Result<CollectionResponseUnion> {
119        let req = req
120            .request
121            .ok_or_else(|| Error::invalid_argument("missing collection request"))?;
122        let res = match req {
123            collection_request_union::Request::ListCollections(_) => {
124                todo!();
125            }
126            collection_request_union::Request::CreateCollection(req) => {
127                let res = self.handle_create_collection(db, req).await?;
128                collection_response_union::Response::CreateCollection(res)
129            }
130            collection_request_union::Request::UpdateCollection(_) => {
131                todo!();
132            }
133            collection_request_union::Request::DeleteCollection(_) => {
134                todo!();
135            }
136            collection_request_union::Request::DescribeCollection(req) => {
137                let res = self.handle_describe_collection(db, req).await?;
138                collection_response_union::Response::DescribeCollection(res)
139            }
140        };
141        Ok(CollectionResponseUnion {
142            response: Some(res),
143        })
144    }
145
146    async fn handle_create_collection(
147        &self,
148        db: Database,
149        req: CreateCollectionRequest,
150    ) -> Result<CreateCollectionResponse> {
151        let desc = req
152            .desc
153            .ok_or_else(|| Error::invalid_argument("missing collection description"))?;
154        let desc = db.create_collection(desc).await?;
155        Ok(CreateCollectionResponse { desc: Some(desc) })
156    }
157
158    async fn handle_describe_collection(
159        &self,
160        db: Database,
161        req: DescribeCollectionRequest,
162    ) -> Result<DescribeCollectionResponse> {
163        let co = db.collection(&req.name).await?;
164        let desc = co.desc().await;
165        Ok(DescribeCollectionResponse { desc: Some(desc) })
166    }
167}
168
169#[tonic::async_trait]
170impl supervisor_server::Supervisor for Server {
171    async fn database(&self, req: Request<DatabaseRequest>) -> Result<Response<DatabaseResponse>> {
172        let req = req.into_inner();
173        let res = self.handle_database(req).await?;
174        Ok(Response::new(res))
175    }
176
177    async fn collection(
178        &self,
179        req: Request<CollectionRequest>,
180    ) -> Result<Response<CollectionResponse>> {
181        let req = req.into_inner();
182        let res = self.handle_collection(req).await?;
183        Ok(Response::new(res))
184    }
185}