containerd_snapshots/
wrap.rs1use std::{convert::TryInto, mem, sync::Arc};
20
21use futures::{stream::BoxStream, StreamExt};
22
23use crate::{
24 api::snapshots::v1::{
25 snapshots_server::{Snapshots, SnapshotsServer},
26 *,
27 },
28 Snapshotter,
29};
30
31pub struct Wrapper<S: Snapshotter> {
32 snapshotter: Arc<S>,
33}
34
35pub fn server<S: Snapshotter>(snapshotter: Arc<S>) -> SnapshotsServer<Wrapper<S>> {
37 SnapshotsServer::new(Wrapper { snapshotter })
38}
39
40#[tonic::async_trait]
41impl<S: Snapshotter> Snapshots for Wrapper<S> {
42 async fn prepare(
43 &self,
44 request: tonic::Request<PrepareSnapshotRequest>,
45 ) -> Result<tonic::Response<PrepareSnapshotResponse>, tonic::Status> {
46 let request = request.into_inner();
47
48 let mounts = self
49 .snapshotter
50 .prepare(request.key, request.parent, request.labels)
51 .await
52 .map_err(Into::into)?;
53 let message = PrepareSnapshotResponse { mounts };
54 Ok(tonic::Response::new(message))
55 }
56
57 async fn view(
58 &self,
59 request: tonic::Request<ViewSnapshotRequest>,
60 ) -> Result<tonic::Response<ViewSnapshotResponse>, tonic::Status> {
61 let request = request.into_inner();
62 let mounts = self
63 .snapshotter
64 .view(request.key, request.parent, request.labels)
65 .await
66 .map_err(Into::into)?;
67 let message = ViewSnapshotResponse { mounts };
68 Ok(tonic::Response::new(message))
69 }
70
71 async fn mounts(
72 &self,
73 request: tonic::Request<MountsRequest>,
74 ) -> Result<tonic::Response<MountsResponse>, tonic::Status> {
75 let request = request.into_inner();
76 let mounts = self
77 .snapshotter
78 .mounts(request.key)
79 .await
80 .map_err(Into::into)?;
81 let message = MountsResponse { mounts };
82 Ok(tonic::Response::new(message))
83 }
84
85 async fn commit(
86 &self,
87 request: tonic::Request<CommitSnapshotRequest>,
88 ) -> Result<tonic::Response<()>, tonic::Status> {
89 let request = request.into_inner();
90 self.snapshotter
91 .commit(request.name, request.key, request.labels)
92 .await
93 .map_err(Into::into)?;
94 Ok(tonic::Response::new(()))
95 }
96
97 async fn remove(
98 &self,
99 request: tonic::Request<RemoveSnapshotRequest>,
100 ) -> Result<tonic::Response<()>, tonic::Status> {
101 let request = request.into_inner();
102 self.snapshotter
103 .remove(request.key)
104 .await
105 .map_err(Into::into)?;
106 Ok(tonic::Response::new(()))
107 }
108
109 async fn stat(
110 &self,
111 request: tonic::Request<StatSnapshotRequest>,
112 ) -> Result<tonic::Response<StatSnapshotResponse>, tonic::Status> {
113 let request = request.into_inner();
114 let info = self
115 .snapshotter
116 .stat(request.key)
117 .await
118 .map_err(Into::into)?;
119 let message = StatSnapshotResponse {
120 info: Some(info.into()),
121 };
122 Ok(tonic::Response::new(message))
123 }
124
125 async fn update(
126 &self,
127 request: tonic::Request<UpdateSnapshotRequest>,
128 ) -> Result<tonic::Response<UpdateSnapshotResponse>, tonic::Status> {
129 let request = request.into_inner();
130 let info = match request.info {
131 Some(info) => info,
132 None => return Err(tonic::Status::failed_precondition("info is required")),
133 };
134
135 let info = match info.try_into() {
136 Ok(info) => info,
137 Err(err) => {
138 let msg = format!("Failed to convert timestamp: {}", err);
139 return Err(tonic::Status::invalid_argument(msg));
140 }
141 };
142
143 let fields = request.update_mask.map(|mask| mask.paths);
144
145 let info = self
146 .snapshotter
147 .update(info, fields)
148 .await
149 .map_err(Into::into)?;
150 let message = UpdateSnapshotResponse {
151 info: Some(info.into()),
152 };
153
154 Ok(tonic::Response::new(message))
155 }
156
157 type ListStream = BoxStream<Result<ListSnapshotsResponse, tonic::Status>, 'static>;
158
159 async fn list(
160 &self,
161 request: tonic::Request<ListSnapshotsRequest>,
162 ) -> Result<tonic::Response<Self::ListStream>, tonic::Status> {
163 let request = request.into_inner();
164 let sn = self.snapshotter.clone();
165 let output = async_stream::try_stream! {
166 let walk_stream = sn.list(request.snapshotter, request.filters).await?;
167 pin_utils::pin_mut!(walk_stream);
168 let mut infos = Vec::<Info>::new();
169 while let Some(info) = walk_stream.next().await {
170 infos.push(info?.into());
171 if infos.len() >= 100 {
172 yield ListSnapshotsResponse { info: mem::take(&mut infos) };
173 }
174 }
175
176 if !infos.is_empty() {
177 yield ListSnapshotsResponse { info: infos };
178 }
179 };
180 Ok(tonic::Response::new(Box::pin(output)))
181 }
182
183 async fn usage(
184 &self,
185 request: tonic::Request<UsageRequest>,
186 ) -> Result<tonic::Response<UsageResponse>, tonic::Status> {
187 let request = request.into_inner();
188
189 let usage = self
190 .snapshotter
191 .usage(request.key)
192 .await
193 .map_err(Into::into)?;
194 let message = UsageResponse {
195 size: usage.size,
196 inodes: usage.inodes,
197 };
198
199 Ok(tonic::Response::new(message))
200 }
201
202 async fn cleanup(
203 &self,
204 _request: tonic::Request<CleanupRequest>,
205 ) -> Result<tonic::Response<()>, tonic::Status> {
206 self.snapshotter.clear().await.map_err(Into::into)?;
207 Ok(tonic::Response::new(()))
208 }
209}