1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
use futures::{Future, Poll};
use ::client::ClientHandle;
use ::error::*;
use ::op::Message;
#[derive(Clone)]
#[must_use = "queries can only be sent through a ClientHandle"]
pub struct RetryClientHandle<H: ClientHandle> {
client: H,
attempts: usize,
}
impl<H> RetryClientHandle<H> where H: ClientHandle {
pub fn new(client: H, attempts: usize) -> RetryClientHandle<H> {
RetryClientHandle { client: client, attempts: attempts }
}
}
impl<H> ClientHandle for RetryClientHandle<H> where H: ClientHandle + 'static {
fn send(&self, message: Message) -> Box<Future<Item=Message, Error=ClientError>> {
let future = self.client.send(message.clone());
return Box::new(RetrySendFuture{
message: message,
client: self.client.clone(),
future: future,
remaining_attempts: self.attempts
});
}
}
struct RetrySendFuture<H: ClientHandle> {
message: Message,
client: H,
future: Box<Future<Item=Message, Error=ClientError>>,
remaining_attempts: usize,
}
impl<H> Future for RetrySendFuture<H> where H: ClientHandle {
type Item = Message;
type Error = ClientError;
fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
loop {
match self.future.poll() {
r @ Ok(_) => return r,
Err(e) => {
if self.remaining_attempts == 0 {
return Err(e);
}
self.remaining_attempts = self.remaining_attempts - 1;
self.future = self.client.send(self.message.clone());
}
}
}
}
}
#[cfg(test)]
mod test {
use std::cell::Cell;
use ::client::*;
use ::error::*;
use ::op::*;
use futures::*;
#[derive(Clone)]
struct TestClient { last_succeed: bool, retries: u16, attempts: Cell<u16> }
impl ClientHandle for TestClient {
fn send(&self, _: Message) -> Box<Future<Item=Message, Error=ClientError>> {
let i = self.attempts.get();
if i > self.retries || self.retries - i == 0 {
if self.last_succeed {
let mut message = Message::new();
message.id(i);
return Box::new(finished(message))
}
}
self.attempts.set(i + 1);
return Box::new(failed(ClientErrorKind::Message("last retry set to fail").into()))
}
}
#[test]
fn test_retry() {
let client = RetryClientHandle::new(TestClient{last_succeed: true, retries: 1, attempts: Cell::new(0)}, 2);
let test1 = Message::new();
let result = client.send(test1).wait().ok().expect("should have succeeded");
assert_eq!(result.get_id(), 1);
}
#[test]
fn test_error() {
let client = RetryClientHandle::new(TestClient{last_succeed: false, retries: 1, attempts: Cell::new(0)}, 2);
let test1 = Message::new();
assert!(client.send(test1).wait().is_err());
}
}