1extern crate cookie;
41extern crate redis;
42
43use finchers;
44use finchers::endpoint::{ApplyContext, ApplyResult, Endpoint};
45use finchers::error::Error;
46use finchers::input::Input;
47
48use std::borrow::Cow;
49use std::fmt;
50use std::mem;
51use std::sync::Arc;
52use std::time::Duration;
53
54use self::redis::async::Connection;
55#[doc(no_inline)]
56pub use self::redis::Client;
57use self::redis::RedisFuture;
58
59use self::cookie::Cookie;
60use futures::{Async, Future, Poll};
61use uuid::Uuid;
62
63use session::{RawSession, Session};
64
65#[derive(Debug)]
66struct RedisSessionConfig {
67 key_prefix: String,
68 cookie_name: Cow<'static, str>,
69 timeout: Option<Duration>,
70}
71
72impl RedisSessionConfig {
73 fn key_name(&self, id: &Uuid) -> String {
74 format!("{}:{}", self.key_prefix, id)
75 }
76
77 fn get_session_id(&self, input: &mut Input) -> Result<Option<Uuid>, Error> {
78 if let Some(cookie) = input.cookies()?.get(&self.cookie_name) {
79 let session_id: Uuid = cookie
80 .value()
81 .parse()
82 .map_err(finchers::error::bad_request)?;
83 return Ok(Some(session_id));
84 }
85 Ok(None)
86 }
87}
88
89#[derive(Debug, Clone)]
91pub struct RedisBackend {
92 client: Client,
93 config: Arc<RedisSessionConfig>,
94}
95
96impl RedisBackend {
97 pub fn new(client: Client) -> RedisBackend {
99 RedisBackend {
100 client,
101 config: Arc::new(RedisSessionConfig {
102 key_prefix: "finchers-sesssion".into(),
103 cookie_name: "session-id".into(),
104 timeout: None,
105 }),
106 }
107 }
108
109 fn config_mut(&mut self) -> &mut RedisSessionConfig {
110 Arc::get_mut(&mut self.config).expect("The instance has already shared.")
111 }
112
113 pub fn key_prefix(mut self, prefix: impl Into<String>) -> RedisBackend {
118 self.config_mut().key_prefix = prefix.into();
119 self
120 }
121
122 pub fn cookie_name(mut self, name: impl Into<Cow<'static, str>>) -> RedisBackend {
126 self.config_mut().cookie_name = name.into();
127 self
128 }
129
130 pub fn timeout(mut self, timeout: Duration) -> RedisBackend {
132 self.config_mut().timeout = Some(timeout);
133 self
134 }
135}
136
137impl<'a> Endpoint<'a> for RedisBackend {
138 type Output = (Session<RedisSession>,);
139 type Future = ReadFuture;
140
141 fn apply(&self, cx: &mut ApplyContext<'_>) -> ApplyResult<Self::Future> {
142 match self.config.get_session_id(cx.input()) {
143 Ok(session_id) => Ok(ReadFuture::connecting(
144 &self.client,
145 &self.config,
146 session_id,
147 )),
148 Err(err) => Ok(ReadFuture::failed(err)),
149 }
150 }
151}
152
153#[doc(hidden)]
154#[allow(missing_debug_implementations)]
155pub struct ReadFuture {
156 state: ReadFutureState,
157}
158
159#[allow(missing_debug_implementations)]
160enum ReadFutureState {
161 Failed(Option<Error>),
162 Connecting {
163 future: RedisFuture<Connection>,
164 config: Arc<RedisSessionConfig>,
165 session_id: Option<Uuid>,
166 },
167 Fetch {
168 future: RedisFuture<(Connection, Option<String>)>,
169 config: Arc<RedisSessionConfig>,
170 session_id: Uuid,
171 },
172 Done,
173}
174
175impl ReadFuture {
176 fn failed(err: Error) -> ReadFuture {
177 ReadFuture {
178 state: ReadFutureState::Failed(Some(err)),
179 }
180 }
181
182 fn connecting(
183 client: &Client,
184 config: &Arc<RedisSessionConfig>,
185 session_id: Option<Uuid>,
186 ) -> ReadFuture {
187 ReadFuture {
188 state: ReadFutureState::Connecting {
189 future: client.get_async_connection(),
190 config: config.clone(),
191 session_id,
192 },
193 }
194 }
195}
196
197impl Future for ReadFuture {
198 type Item = (Session<RedisSession>,);
199 type Error = Error;
200
201 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
202 use self::ReadFutureState::*;
203 loop {
204 let (conn, value) = match self.state {
205 Failed(ref mut err) => {
206 return Err(err.take().expect("This future has alread polled."))
207 }
208 Connecting { ref mut future, .. } => {
209 let conn = try_ready!(future.poll().map_err(finchers::error::fail));
210 (Some(conn), None)
211 }
212 Fetch { ref mut future, .. } => {
213 let (conn, value) = try_ready!(future.poll().map_err(finchers::error::fail));
214 (Some(conn), value)
215 }
216 Done => panic!("unexpected state"),
217 };
218
219 match (mem::replace(&mut self.state, Done), conn, value) {
220 (
221 Connecting {
222 config,
223 session_id: Some(session_id),
224 ..
225 },
226 Some(conn),
227 None,
228 ) => {
229 self.state = Fetch {
230 future: redis::cmd("GET")
231 .arg(config.key_name(&session_id))
232 .query_async(conn),
233 config,
234 session_id,
235 };
236 }
237
238 (
239 Fetch {
240 config, session_id, ..
241 },
242 Some(conn),
243 Some(value),
244 ) => {
245 return Ok(Async::Ready((Session::new(RedisSession {
246 conn,
247 config,
248 session_id: Some(session_id),
249 value: Some(value),
250 }),)))
251 }
252
253 (
254 Connecting {
255 config,
256 session_id: None,
257 ..
258 },
259 Some(conn),
260 None,
261 )
262 | (Fetch { config, .. }, Some(conn), None) => {
263 return Ok(Async::Ready((Session::new(RedisSession {
264 conn,
265 config,
266 session_id: None,
267 value: None,
268 }),)));
269 }
270
271 _ => unreachable!("unexpected condition"),
272 }
273 }
274 }
275}
276
277#[allow(missing_docs)]
280pub struct RedisSession {
281 conn: Connection,
282 config: Arc<RedisSessionConfig>,
283 session_id: Option<Uuid>,
284 value: Option<String>,
285}
286
287impl fmt::Debug for RedisSession {
288 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
289 f.debug_struct("RedisSession")
290 .field("config", &self.config)
291 .field("session_id", &self.session_id)
292 .field("value", &self.value)
293 .finish()
294 }
295}
296
297impl RawSession for RedisSession {
298 type WriteFuture = WriteFuture;
299
300 fn get(&self) -> Option<&str> {
301 self.value.as_ref().map(|s| s.as_ref())
302 }
303
304 fn set(&mut self, value: String) {
305 self.value = Some(value);
306 }
307
308 fn remove(&mut self) {
309 self.value = None;
310 }
311
312 fn write(self, input: &mut Input) -> Self::WriteFuture {
313 let Self {
314 conn,
315 config,
316 session_id,
317 value,
318 } = self;
319
320 match (session_id, value) {
321 (Some(session_id), None) => {
322 match input.cookies() {
323 Ok(jar) => jar.remove(Cookie::named(config.cookie_name.clone())),
324 Err(err) => return WriteFuture::failed(err),
325 }
326 let redis_key = config.key_name(&session_id);
327 WriteFuture::cmd(conn, redis::cmd("DEL").arg(redis_key))
328 }
329 (session_id, Some(value)) => {
330 let session_id = session_id.unwrap_or_else(Uuid::new_v4);
331 match input.cookies() {
332 Ok(jar) => jar.add(Cookie::new(
333 config.cookie_name.clone(),
334 session_id.to_string(),
335 )),
336 Err(err) => return WriteFuture::failed(err),
337 }
338 let redis_key = config.key_name(&session_id);
339
340 if let Some(timeout) = config.timeout {
341 WriteFuture::cmd(
342 conn,
343 redis::cmd("SETEX")
344 .arg(redis_key)
345 .arg(timeout.as_secs())
346 .arg(value),
347 )
348 } else {
349 WriteFuture::cmd(conn, redis::cmd("SET").arg(redis_key).arg(value))
350 }
351 }
352 (None, None) => WriteFuture::no_op(),
353 }
354 }
355}
356
357#[doc(hidden)]
358#[allow(missing_debug_implementations)]
359pub struct WriteFuture {
360 state: WriteFutureState,
361}
362
363enum WriteFutureState {
364 Noop,
365 Failed(Option<Error>),
366 Cmd(RedisFuture<(Connection, ())>),
367}
368
369impl WriteFuture {
370 fn no_op() -> WriteFuture {
371 WriteFuture {
372 state: WriteFutureState::Noop,
373 }
374 }
375
376 fn failed(err: Error) -> WriteFuture {
377 WriteFuture {
378 state: WriteFutureState::Failed(Some(err)),
379 }
380 }
381
382 fn cmd(conn: Connection, cmd: &redis::Cmd) -> WriteFuture {
383 WriteFuture {
384 state: WriteFutureState::Cmd(cmd.query_async::<_, ()>(conn)),
385 }
386 }
387}
388
389impl Future for WriteFuture {
390 type Item = ();
391 type Error = Error;
392
393 #[inline]
394 fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
395 use self::WriteFutureState::*;
396 match self.state {
397 Noop => Ok(().into()),
398 Failed(ref mut err) => Err(err.take().expect("The future has already polled.")),
399 Cmd(ref mut future) => {
400 let (_conn, ()) = try_ready!(future.poll().map_err(finchers::error::fail));
401 Ok(Async::Ready(()))
402 }
403 }
404 }
405}