1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174
//! This module implements integration with `warp`. use cfg_if::cfg_if; use std::sync::Arc; use super::Server; cfg_if! { if #[cfg(any( any(feature = "docs", doc), all( feature = "serde_bincode", not(feature = "serde_json"), not(feature = "serde_cbor"), not(feature = "serde_rmp"), ), all( feature = "serde_cbor", not(feature = "serde_json"), not(feature = "serde_bincode"), not(feature = "serde_rmp"), ), all( feature = "serde_json", not(feature = "serde_bincode"), not(feature = "serde_cbor"), not(feature = "serde_rmp"), ), all( feature = "serde_rmp", not(feature = "serde_cbor"), not(feature = "serde_json"), not(feature = "serde_bincode"), ), ))] { use crate::codec::DefaultCodec; use warp::{Filter, Reply, filters::BoxedFilter}; /// The following impl block is controlled by feature flag. It is enabled /// if and only if **exactly one** of the the following feature flag is turned on /// - `serde_bincode` /// - `serde_json` /// - `serde_cbor` /// - `serde_rmp` impl Server { /// WebSocket handler for integration with `warp` fn warp_websocket_handler(state: Arc<Self>, ws: warp::ws::Ws) -> impl warp::Reply { ws.on_upgrade(|websocket| async move { let codec = DefaultCodec::with_warp_websocket(websocket); let services = state.services.clone(); let fut = Self::serve_codec_setup(codec, services); fut.await.unwrap_or_else(|e| log::error!("{}", e)); }) } /// Returns the `DEFAULT_RPC_PATH` fn handler_path() -> &'static str { crate::DEFAULT_RPC_PATH } /// Consumes `Server` and returns a `warp::filters::BoxedFilter` /// which can be chained with `warp` filters /// /// # Example /// ```rust /// use toy_rpc::Server; /// use toy_rpc::macros::{export_impl, service}; /// use std::sync::Arc; /// /// struct FooService { } /// /// #[export_impl] /// impl FooService { /// // define some "exported" functions here /// } /// /// #[tokio::main] /// async fn main() { /// env_logger::init(); /// /// let foo_service = Arc::new(FooService { }); /// /// let server = Server::builder() /// .register(foo_service) /// .build(); /// /// let routes = warp::path("rpc") /// .and(server.handle_http()); /// /// // RPC will be served at "ws://127.0.0.1/rpc/_rpc_" /// warp::serve(routes).run(([127, 0, 0, 1], 8080)).await; /// } /// ``` pub fn into_boxed_filter(self) -> BoxedFilter<(impl Reply,)> { let state = Arc::new(self); let state = warp::any().map(move || state.clone()); let rpc_route = warp::path(Server::handler_path()) .and(state) .and(warp::ws()) .map(Server::warp_websocket_handler) .boxed(); rpc_route } #[cfg(any( all( feature = "http_warp", not(feature = "http_actix_web"), not(feature = "http_tide"), ), feature = "docs" ))] #[cfg_attr( feature = "docs", doc(cfg(all( feature = "http_warp", not(feature = "http_actix_web"), not(feature = "http_tide"), ))) )] /// A conevience function that calls the corresponding http handling /// function depending on the enabled feature flag /// /// | feature flag | function name | /// | ------------ |---| /// | `http_tide`| [`into_endpoint`](#method.into_endpoint) | /// | `http_actix_web` | [`scope_config`](#method.scope_config) | /// | `http_warp` | [`into_boxed_filter`](#method.into_boxed_filter) | /// /// This is enabled /// if and only if **exactly one** of the the following feature flag is turned on /// - `serde_bincode` /// - `serde_json` /// - `serde_cbor` /// - `serde_rmp` /// /// # Example /// ```rust /// use toy_rpc::Server; /// use toy_rpc::macros::{export_impl, service}; /// use std::sync::Arc; /// /// struct FooService { } /// /// #[export_impl] /// impl FooService { /// // define some "exported" functions here /// } /// /// #[tokio::main] /// async fn main() { /// env_logger::init(); /// /// let foo_service = Arc::new(FooService { }); /// /// let server = Server::builder() /// .register(foo_service) /// .build(); /// /// let routes = warp::path("rpc") /// .and(server.handle_http()); /// /// // RPC will be served at "ws://127.0.0.1/rpc/_rpc_" /// warp::serve(routes).run(([127, 0, 0, 1], 8080)).await; /// } /// ``` pub fn handle_http(self) -> BoxedFilter<(impl Reply,)> { self.into_boxed_filter() } } } }