1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117
//! A library to help you convert your sync functions into non-blocking thread futures. //! //! Futurify uses `futures 0.1` for the moment. //! //! # Examples //! //! A simple `actix-web` server serving async endpoints: //! //! ```rust,no_run //! use actix_web::{web, App, Error, HttpResponse, HttpServer, Responder}; //! use futures::{Async, Future, Poll}; //! use std::time::Duration; //! //! fn index_async() -> impl Future<Item = impl Responder, Error = Error> { //! futures::future::ok("Hello world") //! } //! //! // this function will block the executor's thread. //! fn index_async_blocking() -> impl Future<Item = impl Responder, Error = Error> { //! futures::lazy(|| { //! std::thread::sleep(Duration::from_secs(5)); //! futures::future::ok("Hello blocking world") //! }) //! } //! //! // this function won't block the executor's thread. //! fn index_async_non_blocking() -> impl Future<Item = impl Responder, Error = Error> { //! futurify::wrap(|| { //! std::thread::sleep(Duration::from_secs(5)); //! "Hello blocking world" //! }) //! } //! //! fn main() -> std::io::Result<()> { //! HttpServer::new(|| { //! App::new() //! .route("/", web::get().to_async(index_async)) //! .route("/blocking", web::get().to_async(index_async_blocking)) //! .route("/non-blocking", web::get().to_async(index_async_non_blocking)) //! }) //! .workers(1) //! .bind("localhost:8080")? //! .run() //! } //! ``` //! By using `futurify` you'll be able to run the closure in a new thread and get the returned value in a future. use futures::{Async, Future}; use std::error::Error; use std::sync::mpsc::{channel, Receiver, Sender}; use std::thread; /// Future wrapping a sync function that will be executed /// in a separate thread. /// /// It uses `std::thread::spawn` and `mpsc::channel` under the hood. pub struct Futurified<T: Send + 'static, E: Error> { tx: Sender<T>, rx: Receiver<T>, wrapped: fn() -> T, is_running: bool, error: std::marker::PhantomData<E>, } /// Wraps a closure to be executed in a separate thread. /// It will be executed once the returning Future is polled. /// /// The Future will return whatever the closure returns. pub fn wrap<T: Send + 'static, E: Error>(wrapped: fn() -> T) -> Futurified<T, E> { let (tx, rx) = channel(); Futurified { tx, rx, wrapped, is_running: false, error: std::marker::PhantomData, } } /// Similar to `wrap` but this will execute the closure even if the /// future is never polled. /// /// See [`wrap`] for more details. pub fn wrap_eager<T: Send + 'static, E: Error>(wrapped: fn() -> T) -> Futurified<T, E> { let mut this = wrap(wrapped); this.run(); this } impl<T: Send + 'static, E: Error> Futurified<T, E> { fn run(&mut self) { self.is_running = true; let tx = self.tx.clone(); let sfn = self.wrapped; thread::spawn(move || { let result = sfn(); if let Err(e) = tx.send(result) { println!("Error sending result: {}", e) } }); } } impl<T: Send + 'static, E: Error> Future for Futurified<T, E> { type Item = T; type Error = E; fn poll(&mut self) -> Result<Async<Self::Item>, Self::Error> { if !self.is_running { self.run(); } if let Ok(x) = self.rx.try_recv() { Ok(Async::Ready(x)) } else { Ok(Async::NotReady) } } }