1use fibers::sync::oneshot;
3use futures::{Future, Poll};
4use std::ops::Range;
5use std::sync::mpsc::{Receiver, Sender};
6use trackable::error::ErrorKindExt;
7
8use deadline::Deadline;
9use lump::{LumpData, LumpHeader, LumpId};
10use storage::StorageUsage;
11use {Error, ErrorKind, Result};
12
13pub type CommandSender = Sender<Command>;
14pub type CommandReceiver = Receiver<Command>;
15
16#[derive(Debug)]
17pub enum Command {
18 Put(PutLump),
19 Get(GetLump),
20 Head(HeadLump),
21 Delete(DeleteLump),
22 DeleteRange(DeleteLumpRange),
23 List(ListLump),
24 ListRange(ListLumpRange),
25 UsageRange(UsageLumpRange),
26 Stop(StopDevice),
27}
28impl Command {
29 pub fn deadline(&self) -> Deadline {
30 match *self {
31 Command::Put(ref c) => c.deadline,
32 Command::Get(ref c) => c.deadline,
33 Command::Head(ref c) => c.deadline,
34 Command::Delete(ref c) => c.deadline,
35 Command::DeleteRange(ref c) => c.deadline,
36 Command::List(ref c) => c.deadline,
37 Command::ListRange(ref c) => c.deadline,
38 Command::UsageRange(ref c) => c.deadline,
39 Command::Stop(ref c) => c.deadline,
40 }
41 }
42 pub fn prioritized(&self) -> bool {
43 match *self {
44 Command::Put(ref c) => c.prioritized,
45 Command::Get(ref c) => c.prioritized,
46 Command::Head(ref c) => c.prioritized,
47 Command::Delete(ref c) => c.prioritized,
48 Command::DeleteRange(ref c) => c.prioritized,
49 Command::List(ref c) => c.prioritized,
50 Command::ListRange(ref c) => c.prioritized,
51 Command::UsageRange(ref c) => c.prioritized,
52 Command::Stop(ref c) => c.prioritized,
53 }
54 }
55 pub fn failed(self, error: Error) {
56 match self {
57 Command::Put(c) => c.reply.send(Err(error)),
58 Command::Get(c) => c.reply.send(Err(error)),
59 Command::Head(c) => c.reply.send(Err(error)),
60 Command::Delete(c) => c.reply.send(Err(error)),
61 Command::DeleteRange(c) => c.reply.send(Err(error)),
62 Command::List(c) => c.reply.send(Err(error)),
63 Command::ListRange(c) => c.reply.send(Err(error)),
64 Command::UsageRange(c) => c.reply.send(Err(error)),
65 Command::Stop(_) => {}
66 }
67 }
68}
69
70#[derive(Debug)]
72pub struct AsyncResult<T>(oneshot::Monitor<T, Error>);
73impl<T> AsyncResult<T> {
74 #[allow(clippy::new_ret_no_self)]
75 fn new() -> (AsyncReply<T>, Self) {
76 let (tx, rx) = oneshot::monitor();
77 (AsyncReply(tx), AsyncResult(rx))
78 }
79}
80impl<T> Future for AsyncResult<T> {
81 type Item = T;
82 type Error = Error;
83 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
84 track!(self
85 .0
86 .poll()
87 .map_err(|e| e.unwrap_or_else(|| ErrorKind::DeviceTerminated
88 .cause("monitoring channel disconnected")
89 .into())))
90 }
91}
92
93#[derive(Debug)]
94struct AsyncReply<T>(oneshot::Monitored<T, Error>);
95impl<T> AsyncReply<T> {
96 fn send(self, result: Result<T>) {
97 self.0.exit(result);
98 }
99}
100
101#[derive(Debug)]
102pub struct PutLump {
103 lump_id: LumpId,
104 lump_data: LumpData,
105 deadline: Deadline,
106 prioritized: bool,
107 journal_sync: bool,
108 reply: AsyncReply<bool>,
109}
110impl PutLump {
111 #[allow(clippy::new_ret_no_self)]
112 pub fn new(
113 lump_id: LumpId,
114 lump_data: LumpData,
115 deadline: Deadline,
116 prioritized: bool,
117 journal_sync: bool,
118 ) -> (Self, AsyncResult<bool>) {
119 let (reply, result) = AsyncResult::new();
120 let command = PutLump {
121 lump_id,
122 lump_data,
123 deadline,
124 prioritized,
125 journal_sync,
126 reply,
127 };
128 (command, result)
129 }
130 pub fn lump_id(&self) -> &LumpId {
131 &self.lump_id
132 }
133 pub fn lump_data(&self) -> &LumpData {
134 &self.lump_data
135 }
136 pub fn do_sync_journal(&self) -> bool {
137 self.journal_sync
138 }
139
140 pub fn reply(self, result: Result<bool>) {
141 self.reply.send(result)
142 }
143}
144
145#[derive(Debug)]
146pub struct GetLump {
147 lump_id: LumpId,
148 deadline: Deadline,
149 prioritized: bool,
150 reply: AsyncReply<Option<LumpData>>,
151}
152impl GetLump {
153 #[allow(clippy::new_ret_no_self)]
154 pub fn new(
155 lump_id: LumpId,
156 deadline: Deadline,
157 prioritized: bool,
158 ) -> (Self, AsyncResult<Option<LumpData>>) {
159 let (reply, result) = AsyncResult::new();
160 let command = GetLump {
161 lump_id,
162 deadline,
163 prioritized,
164 reply,
165 };
166 (command, result)
167 }
168 pub fn lump_id(&self) -> &LumpId {
169 &self.lump_id
170 }
171 pub fn reply(self, result: Result<Option<LumpData>>) {
172 self.reply.send(result);
173 }
174}
175
176#[derive(Debug)]
177pub struct HeadLump {
178 lump_id: LumpId,
179 deadline: Deadline,
180 prioritized: bool,
181 reply: AsyncReply<Option<LumpHeader>>,
182}
183impl HeadLump {
184 #[allow(clippy::new_ret_no_self)]
185 pub fn new(
186 lump_id: LumpId,
187 deadline: Deadline,
188 prioritized: bool,
189 ) -> (Self, AsyncResult<Option<LumpHeader>>) {
190 let (reply, result) = AsyncResult::new();
191 let command = HeadLump {
192 lump_id,
193 deadline,
194 prioritized,
195 reply,
196 };
197 (command, result)
198 }
199 pub fn lump_id(&self) -> &LumpId {
200 &self.lump_id
201 }
202 pub fn reply(self, result: Result<Option<LumpHeader>>) {
203 self.reply.send(result);
204 }
205}
206
207#[derive(Debug)]
208pub struct DeleteLump {
209 lump_id: LumpId,
210 deadline: Deadline,
211 prioritized: bool,
212 journal_sync: bool,
213 reply: AsyncReply<bool>,
214}
215impl DeleteLump {
216 #[allow(clippy::new_ret_no_self)]
217 pub fn new(
218 lump_id: LumpId,
219 deadline: Deadline,
220 prioritized: bool,
221 journal_sync: bool,
222 ) -> (Self, AsyncResult<bool>) {
223 let (reply, result) = AsyncResult::new();
224 let command = DeleteLump {
225 lump_id,
226 deadline,
227 prioritized,
228 journal_sync,
229 reply,
230 };
231 (command, result)
232 }
233 pub fn lump_id(&self) -> &LumpId {
234 &self.lump_id
235 }
236 pub fn do_sync_journal(&self) -> bool {
237 self.journal_sync
238 }
239 pub fn reply(self, result: Result<bool>) {
240 self.reply.send(result);
241 }
242}
243
244#[derive(Debug)]
245pub struct DeleteLumpRange {
246 range: Range<LumpId>,
247 deadline: Deadline,
248 prioritized: bool,
249 journal_sync: bool,
250 reply: AsyncReply<Vec<LumpId>>,
251}
252impl DeleteLumpRange {
253 #[allow(clippy::new_ret_no_self)]
254 pub fn new(
255 range: Range<LumpId>,
256 deadline: Deadline,
257 prioritized: bool,
258 journal_sync: bool,
259 ) -> (Self, AsyncResult<Vec<LumpId>>) {
260 let (reply, result) = AsyncResult::new();
261 let command = DeleteLumpRange {
262 range,
263 deadline,
264 prioritized,
265 journal_sync,
266 reply,
267 };
268 (command, result)
269 }
270 pub fn lump_range(&self) -> Range<LumpId> {
271 self.range.clone()
272 }
273 pub fn do_sync_journal(&self) -> bool {
274 self.journal_sync
275 }
276 pub fn reply(self, result: Result<Vec<LumpId>>) {
277 self.reply.send(result);
278 }
279}
280
281#[derive(Debug)]
282pub struct ListLump {
283 deadline: Deadline,
284 prioritized: bool,
285 reply: AsyncReply<Vec<LumpId>>,
286}
287impl ListLump {
288 #[allow(clippy::new_ret_no_self)]
289 pub fn new(deadline: Deadline, prioritized: bool) -> (Self, AsyncResult<Vec<LumpId>>) {
290 let (reply, result) = AsyncResult::new();
291 let command = ListLump {
292 deadline,
293 prioritized,
294 reply,
295 };
296 (command, result)
297 }
298 pub fn reply(self, result: Result<Vec<LumpId>>) {
299 self.reply.send(result);
300 }
301}
302
303#[derive(Debug)]
304pub struct ListLumpRange {
305 range: Range<LumpId>,
306 deadline: Deadline,
307 prioritized: bool,
308 reply: AsyncReply<Vec<LumpId>>,
309}
310impl ListLumpRange {
311 #[allow(clippy::new_ret_no_self)]
312 pub fn new(
313 range: Range<LumpId>,
314 deadline: Deadline,
315 prioritized: bool,
316 ) -> (Self, AsyncResult<Vec<LumpId>>) {
317 let (reply, result) = AsyncResult::new();
318 let command = ListLumpRange {
319 range,
320 deadline,
321 prioritized,
322 reply,
323 };
324 (command, result)
325 }
326 pub fn lump_range(&self) -> Range<LumpId> {
327 self.range.clone()
328 }
329 pub fn reply(self, result: Result<Vec<LumpId>>) {
330 self.reply.send(result);
331 }
332}
333
334#[derive(Debug)]
335pub struct UsageLumpRange {
336 range: Range<LumpId>,
337 deadline: Deadline,
338 prioritized: bool,
339 reply: AsyncReply<StorageUsage>,
340}
341impl UsageLumpRange {
342 #[allow(clippy::new_ret_no_self)]
343 pub fn new(
344 range: Range<LumpId>,
345 deadline: Deadline,
346 prioritized: bool,
347 ) -> (Self, AsyncResult<StorageUsage>) {
348 let (reply, result) = AsyncResult::new();
349 let command = UsageLumpRange {
350 range,
351 deadline,
352 prioritized,
353 reply,
354 };
355 (command, result)
356 }
357 pub fn lump_range(&self) -> Range<LumpId> {
358 self.range.clone()
359 }
360 pub fn reply(self, result: Result<StorageUsage>) {
361 self.reply.send(result);
362 }
363}
364
365#[derive(Debug)]
366pub struct StopDevice {
367 deadline: Deadline,
368 prioritized: bool,
369}
370impl StopDevice {
371 pub fn new(deadline: Deadline, prioritized: bool) -> Self {
372 StopDevice {
373 deadline,
374 prioritized,
375 }
376 }
377}