Skip to main content

cannyls/device/
command.rs

1//! デバイスに発行されるコマンド群の定義.
2use fibers::sync::oneshot;
3use futures::{Future, Poll};
4use std::ops::Range;
5use std::sync::mpsc::{Receiver, Sender};
6use trackable::error::ErrorKindExt;
7
8use deadline::Deadline;
9use lump::{LumpData, LumpHeader, LumpId};
10use storage::StorageUsage;
11use {Error, ErrorKind, Result};
12
13pub type CommandSender = Sender<Command>;
14pub type CommandReceiver = Receiver<Command>;
15
16#[derive(Debug)]
17pub enum Command {
18    Put(PutLump),
19    Get(GetLump),
20    Head(HeadLump),
21    Delete(DeleteLump),
22    DeleteRange(DeleteLumpRange),
23    List(ListLump),
24    ListRange(ListLumpRange),
25    UsageRange(UsageLumpRange),
26    Stop(StopDevice),
27}
28impl Command {
29    pub fn deadline(&self) -> Deadline {
30        match *self {
31            Command::Put(ref c) => c.deadline,
32            Command::Get(ref c) => c.deadline,
33            Command::Head(ref c) => c.deadline,
34            Command::Delete(ref c) => c.deadline,
35            Command::DeleteRange(ref c) => c.deadline,
36            Command::List(ref c) => c.deadline,
37            Command::ListRange(ref c) => c.deadline,
38            Command::UsageRange(ref c) => c.deadline,
39            Command::Stop(ref c) => c.deadline,
40        }
41    }
42    pub fn prioritized(&self) -> bool {
43        match *self {
44            Command::Put(ref c) => c.prioritized,
45            Command::Get(ref c) => c.prioritized,
46            Command::Head(ref c) => c.prioritized,
47            Command::Delete(ref c) => c.prioritized,
48            Command::DeleteRange(ref c) => c.prioritized,
49            Command::List(ref c) => c.prioritized,
50            Command::ListRange(ref c) => c.prioritized,
51            Command::UsageRange(ref c) => c.prioritized,
52            Command::Stop(ref c) => c.prioritized,
53        }
54    }
55    pub fn failed(self, error: Error) {
56        match self {
57            Command::Put(c) => c.reply.send(Err(error)),
58            Command::Get(c) => c.reply.send(Err(error)),
59            Command::Head(c) => c.reply.send(Err(error)),
60            Command::Delete(c) => c.reply.send(Err(error)),
61            Command::DeleteRange(c) => c.reply.send(Err(error)),
62            Command::List(c) => c.reply.send(Err(error)),
63            Command::ListRange(c) => c.reply.send(Err(error)),
64            Command::UsageRange(c) => c.reply.send(Err(error)),
65            Command::Stop(_) => {}
66        }
67    }
68}
69
70/// `Result`の非同期版.
71#[derive(Debug)]
72pub struct AsyncResult<T>(oneshot::Monitor<T, Error>);
73impl<T> AsyncResult<T> {
74    #[allow(clippy::new_ret_no_self)]
75    fn new() -> (AsyncReply<T>, Self) {
76        let (tx, rx) = oneshot::monitor();
77        (AsyncReply(tx), AsyncResult(rx))
78    }
79}
80impl<T> Future for AsyncResult<T> {
81    type Item = T;
82    type Error = Error;
83    fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
84        track!(self
85            .0
86            .poll()
87            .map_err(|e| e.unwrap_or_else(|| ErrorKind::DeviceTerminated
88                .cause("monitoring channel disconnected")
89                .into())))
90    }
91}
92
93#[derive(Debug)]
94struct AsyncReply<T>(oneshot::Monitored<T, Error>);
95impl<T> AsyncReply<T> {
96    fn send(self, result: Result<T>) {
97        self.0.exit(result);
98    }
99}
100
101#[derive(Debug)]
102pub struct PutLump {
103    lump_id: LumpId,
104    lump_data: LumpData,
105    deadline: Deadline,
106    prioritized: bool,
107    journal_sync: bool,
108    reply: AsyncReply<bool>,
109}
110impl PutLump {
111    #[allow(clippy::new_ret_no_self)]
112    pub fn new(
113        lump_id: LumpId,
114        lump_data: LumpData,
115        deadline: Deadline,
116        prioritized: bool,
117        journal_sync: bool,
118    ) -> (Self, AsyncResult<bool>) {
119        let (reply, result) = AsyncResult::new();
120        let command = PutLump {
121            lump_id,
122            lump_data,
123            deadline,
124            prioritized,
125            journal_sync,
126            reply,
127        };
128        (command, result)
129    }
130    pub fn lump_id(&self) -> &LumpId {
131        &self.lump_id
132    }
133    pub fn lump_data(&self) -> &LumpData {
134        &self.lump_data
135    }
136    pub fn do_sync_journal(&self) -> bool {
137        self.journal_sync
138    }
139
140    pub fn reply(self, result: Result<bool>) {
141        self.reply.send(result)
142    }
143}
144
145#[derive(Debug)]
146pub struct GetLump {
147    lump_id: LumpId,
148    deadline: Deadline,
149    prioritized: bool,
150    reply: AsyncReply<Option<LumpData>>,
151}
152impl GetLump {
153    #[allow(clippy::new_ret_no_self)]
154    pub fn new(
155        lump_id: LumpId,
156        deadline: Deadline,
157        prioritized: bool,
158    ) -> (Self, AsyncResult<Option<LumpData>>) {
159        let (reply, result) = AsyncResult::new();
160        let command = GetLump {
161            lump_id,
162            deadline,
163            prioritized,
164            reply,
165        };
166        (command, result)
167    }
168    pub fn lump_id(&self) -> &LumpId {
169        &self.lump_id
170    }
171    pub fn reply(self, result: Result<Option<LumpData>>) {
172        self.reply.send(result);
173    }
174}
175
176#[derive(Debug)]
177pub struct HeadLump {
178    lump_id: LumpId,
179    deadline: Deadline,
180    prioritized: bool,
181    reply: AsyncReply<Option<LumpHeader>>,
182}
183impl HeadLump {
184    #[allow(clippy::new_ret_no_self)]
185    pub fn new(
186        lump_id: LumpId,
187        deadline: Deadline,
188        prioritized: bool,
189    ) -> (Self, AsyncResult<Option<LumpHeader>>) {
190        let (reply, result) = AsyncResult::new();
191        let command = HeadLump {
192            lump_id,
193            deadline,
194            prioritized,
195            reply,
196        };
197        (command, result)
198    }
199    pub fn lump_id(&self) -> &LumpId {
200        &self.lump_id
201    }
202    pub fn reply(self, result: Result<Option<LumpHeader>>) {
203        self.reply.send(result);
204    }
205}
206
207#[derive(Debug)]
208pub struct DeleteLump {
209    lump_id: LumpId,
210    deadline: Deadline,
211    prioritized: bool,
212    journal_sync: bool,
213    reply: AsyncReply<bool>,
214}
215impl DeleteLump {
216    #[allow(clippy::new_ret_no_self)]
217    pub fn new(
218        lump_id: LumpId,
219        deadline: Deadline,
220        prioritized: bool,
221        journal_sync: bool,
222    ) -> (Self, AsyncResult<bool>) {
223        let (reply, result) = AsyncResult::new();
224        let command = DeleteLump {
225            lump_id,
226            deadline,
227            prioritized,
228            journal_sync,
229            reply,
230        };
231        (command, result)
232    }
233    pub fn lump_id(&self) -> &LumpId {
234        &self.lump_id
235    }
236    pub fn do_sync_journal(&self) -> bool {
237        self.journal_sync
238    }
239    pub fn reply(self, result: Result<bool>) {
240        self.reply.send(result);
241    }
242}
243
244#[derive(Debug)]
245pub struct DeleteLumpRange {
246    range: Range<LumpId>,
247    deadline: Deadline,
248    prioritized: bool,
249    journal_sync: bool,
250    reply: AsyncReply<Vec<LumpId>>,
251}
252impl DeleteLumpRange {
253    #[allow(clippy::new_ret_no_self)]
254    pub fn new(
255        range: Range<LumpId>,
256        deadline: Deadline,
257        prioritized: bool,
258        journal_sync: bool,
259    ) -> (Self, AsyncResult<Vec<LumpId>>) {
260        let (reply, result) = AsyncResult::new();
261        let command = DeleteLumpRange {
262            range,
263            deadline,
264            prioritized,
265            journal_sync,
266            reply,
267        };
268        (command, result)
269    }
270    pub fn lump_range(&self) -> Range<LumpId> {
271        self.range.clone()
272    }
273    pub fn do_sync_journal(&self) -> bool {
274        self.journal_sync
275    }
276    pub fn reply(self, result: Result<Vec<LumpId>>) {
277        self.reply.send(result);
278    }
279}
280
281#[derive(Debug)]
282pub struct ListLump {
283    deadline: Deadline,
284    prioritized: bool,
285    reply: AsyncReply<Vec<LumpId>>,
286}
287impl ListLump {
288    #[allow(clippy::new_ret_no_self)]
289    pub fn new(deadline: Deadline, prioritized: bool) -> (Self, AsyncResult<Vec<LumpId>>) {
290        let (reply, result) = AsyncResult::new();
291        let command = ListLump {
292            deadline,
293            prioritized,
294            reply,
295        };
296        (command, result)
297    }
298    pub fn reply(self, result: Result<Vec<LumpId>>) {
299        self.reply.send(result);
300    }
301}
302
303#[derive(Debug)]
304pub struct ListLumpRange {
305    range: Range<LumpId>,
306    deadline: Deadline,
307    prioritized: bool,
308    reply: AsyncReply<Vec<LumpId>>,
309}
310impl ListLumpRange {
311    #[allow(clippy::new_ret_no_self)]
312    pub fn new(
313        range: Range<LumpId>,
314        deadline: Deadline,
315        prioritized: bool,
316    ) -> (Self, AsyncResult<Vec<LumpId>>) {
317        let (reply, result) = AsyncResult::new();
318        let command = ListLumpRange {
319            range,
320            deadline,
321            prioritized,
322            reply,
323        };
324        (command, result)
325    }
326    pub fn lump_range(&self) -> Range<LumpId> {
327        self.range.clone()
328    }
329    pub fn reply(self, result: Result<Vec<LumpId>>) {
330        self.reply.send(result);
331    }
332}
333
334#[derive(Debug)]
335pub struct UsageLumpRange {
336    range: Range<LumpId>,
337    deadline: Deadline,
338    prioritized: bool,
339    reply: AsyncReply<StorageUsage>,
340}
341impl UsageLumpRange {
342    #[allow(clippy::new_ret_no_self)]
343    pub fn new(
344        range: Range<LumpId>,
345        deadline: Deadline,
346        prioritized: bool,
347    ) -> (Self, AsyncResult<StorageUsage>) {
348        let (reply, result) = AsyncResult::new();
349        let command = UsageLumpRange {
350            range,
351            deadline,
352            prioritized,
353            reply,
354        };
355        (command, result)
356    }
357    pub fn lump_range(&self) -> Range<LumpId> {
358        self.range.clone()
359    }
360    pub fn reply(self, result: Result<StorageUsage>) {
361        self.reply.send(result);
362    }
363}
364
365#[derive(Debug)]
366pub struct StopDevice {
367    deadline: Deadline,
368    prioritized: bool,
369}
370impl StopDevice {
371    pub fn new(deadline: Deadline, prioritized: bool) -> Self {
372        StopDevice {
373            deadline,
374            prioritized,
375        }
376    }
377}