containerd_snapshots/
wrap.rs

1/*
2   Copyright The containerd Authors.
3
4   Licensed under the Apache License, Version 2.0 (the "License");
5   you may not use this file except in compliance with the License.
6   You may obtain a copy of the License at
7
8       http://www.apache.org/licenses/LICENSE-2.0
9
10   Unless required by applicable law or agreed to in writing, software
11   distributed under the License is distributed on an "AS IS" BASIS,
12   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13   See the License for the specific language governing permissions and
14   limitations under the License.
15*/
16
17//! Trait wrapper to server GRPC requests.
18
19use std::{convert::TryInto, mem, sync::Arc};
20
21use futures::{stream::BoxStream, StreamExt};
22
23use crate::{
24    api::snapshots::v1::{
25        snapshots_server::{Snapshots, SnapshotsServer},
26        *,
27    },
28    Snapshotter,
29};
30
31pub struct Wrapper<S: Snapshotter> {
32    snapshotter: Arc<S>,
33}
34
35/// Helper to create snapshots server from any object that implements [Snapshotter] trait.
36pub fn server<S: Snapshotter>(snapshotter: Arc<S>) -> SnapshotsServer<Wrapper<S>> {
37    SnapshotsServer::new(Wrapper { snapshotter })
38}
39
40#[tonic::async_trait]
41impl<S: Snapshotter> Snapshots for Wrapper<S> {
42    async fn prepare(
43        &self,
44        request: tonic::Request<PrepareSnapshotRequest>,
45    ) -> Result<tonic::Response<PrepareSnapshotResponse>, tonic::Status> {
46        let request = request.into_inner();
47
48        let mounts = self
49            .snapshotter
50            .prepare(request.key, request.parent, request.labels)
51            .await
52            .map_err(Into::into)?;
53        let message = PrepareSnapshotResponse { mounts };
54        Ok(tonic::Response::new(message))
55    }
56
57    async fn view(
58        &self,
59        request: tonic::Request<ViewSnapshotRequest>,
60    ) -> Result<tonic::Response<ViewSnapshotResponse>, tonic::Status> {
61        let request = request.into_inner();
62        let mounts = self
63            .snapshotter
64            .view(request.key, request.parent, request.labels)
65            .await
66            .map_err(Into::into)?;
67        let message = ViewSnapshotResponse { mounts };
68        Ok(tonic::Response::new(message))
69    }
70
71    async fn mounts(
72        &self,
73        request: tonic::Request<MountsRequest>,
74    ) -> Result<tonic::Response<MountsResponse>, tonic::Status> {
75        let request = request.into_inner();
76        let mounts = self
77            .snapshotter
78            .mounts(request.key)
79            .await
80            .map_err(Into::into)?;
81        let message = MountsResponse { mounts };
82        Ok(tonic::Response::new(message))
83    }
84
85    async fn commit(
86        &self,
87        request: tonic::Request<CommitSnapshotRequest>,
88    ) -> Result<tonic::Response<()>, tonic::Status> {
89        let request = request.into_inner();
90        self.snapshotter
91            .commit(request.name, request.key, request.labels)
92            .await
93            .map_err(Into::into)?;
94        Ok(tonic::Response::new(()))
95    }
96
97    async fn remove(
98        &self,
99        request: tonic::Request<RemoveSnapshotRequest>,
100    ) -> Result<tonic::Response<()>, tonic::Status> {
101        let request = request.into_inner();
102        self.snapshotter
103            .remove(request.key)
104            .await
105            .map_err(Into::into)?;
106        Ok(tonic::Response::new(()))
107    }
108
109    async fn stat(
110        &self,
111        request: tonic::Request<StatSnapshotRequest>,
112    ) -> Result<tonic::Response<StatSnapshotResponse>, tonic::Status> {
113        let request = request.into_inner();
114        let info = self
115            .snapshotter
116            .stat(request.key)
117            .await
118            .map_err(Into::into)?;
119        let message = StatSnapshotResponse {
120            info: Some(info.into()),
121        };
122        Ok(tonic::Response::new(message))
123    }
124
125    async fn update(
126        &self,
127        request: tonic::Request<UpdateSnapshotRequest>,
128    ) -> Result<tonic::Response<UpdateSnapshotResponse>, tonic::Status> {
129        let request = request.into_inner();
130        let info = match request.info {
131            Some(info) => info,
132            None => return Err(tonic::Status::failed_precondition("info is required")),
133        };
134
135        let info = match info.try_into() {
136            Ok(info) => info,
137            Err(err) => {
138                let msg = format!("Failed to convert timestamp: {}", err);
139                return Err(tonic::Status::invalid_argument(msg));
140            }
141        };
142
143        let fields = request.update_mask.map(|mask| mask.paths);
144
145        let info = self
146            .snapshotter
147            .update(info, fields)
148            .await
149            .map_err(Into::into)?;
150        let message = UpdateSnapshotResponse {
151            info: Some(info.into()),
152        };
153
154        Ok(tonic::Response::new(message))
155    }
156
157    type ListStream = BoxStream<Result<ListSnapshotsResponse, tonic::Status>, 'static>;
158
159    async fn list(
160        &self,
161        request: tonic::Request<ListSnapshotsRequest>,
162    ) -> Result<tonic::Response<Self::ListStream>, tonic::Status> {
163        let request = request.into_inner();
164        let sn = self.snapshotter.clone();
165        let output = async_stream::try_stream! {
166            let walk_stream = sn.list(request.snapshotter, request.filters).await?;
167            pin_utils::pin_mut!(walk_stream);
168            let mut infos = Vec::<Info>::new();
169            while let Some(info) = walk_stream.next().await {
170                infos.push(info?.into());
171                if infos.len() >= 100 {
172                    yield ListSnapshotsResponse { info: mem::take(&mut infos) };
173                }
174            }
175
176            if !infos.is_empty() {
177                yield ListSnapshotsResponse { info: infos };
178            }
179        };
180        Ok(tonic::Response::new(Box::pin(output)))
181    }
182
183    async fn usage(
184        &self,
185        request: tonic::Request<UsageRequest>,
186    ) -> Result<tonic::Response<UsageResponse>, tonic::Status> {
187        let request = request.into_inner();
188
189        let usage = self
190            .snapshotter
191            .usage(request.key)
192            .await
193            .map_err(Into::into)?;
194        let message = UsageResponse {
195            size: usage.size,
196            inodes: usage.inodes,
197        };
198
199        Ok(tonic::Response::new(message))
200    }
201
202    async fn cleanup(
203        &self,
204        _request: tonic::Request<CleanupRequest>,
205    ) -> Result<tonic::Response<()>, tonic::Status> {
206        self.snapshotter.clear().await.map_err(Into::into)?;
207        Ok(tonic::Response::new(()))
208    }
209}