wasmcloud_httpclient/
lib.rs

1///! http-client-provider
2///! This library exposes the HTTP client capability to wasmCloud-compliant actors
3mod http_client;
4
5use codec::capabilities::{CapabilityProvider, Dispatcher, NullDispatcher};
6use codec::core::{OP_BIND_ACTOR, OP_REMOVE_ACTOR, SYSTEM_ACTOR};
7use codec::{capability_provider, deserialize};
8use http::{RequestArgs, OP_REQUEST};
9use log::{info, warn};
10use std::collections::HashMap;
11use std::error::Error;
12use std::sync::{Arc, RwLock};
13use std::time::Duration;
14use wasmcloud_actor_core::CapabilityConfiguration;
15use wasmcloud_actor_http_client as http;
16use wasmcloud_provider_core as codec;
17
18#[allow(unused)]
19const CAPABILITY_ID: &str = "wasmcloud:httpclient";
20
21#[cfg(not(feature = "static_plugin"))]
22capability_provider!(HttpClientProvider, HttpClientProvider::new);
23
24/// An implementation HTTP client provider using reqwest.
25#[derive(Clone)]
26pub struct HttpClientProvider {
27    dispatcher: Arc<RwLock<Box<dyn Dispatcher>>>,
28    clients: Arc<RwLock<HashMap<String, reqwest::Client>>>,
29    runtime: Arc<tokio::runtime::Runtime>,
30}
31
32impl HttpClientProvider {
33    /// Create a new HTTP client provider.
34    pub fn new() -> Self {
35        Self::default()
36    }
37
38    /// Configure the HTTP client for a particular actor.
39    /// Each actor gets a dedicated client so that we can take advantage of connection pooling.
40    /// TODO: This needs to set things like timeouts, redirects, etc.
41    fn configure(
42        &self,
43        config: CapabilityConfiguration,
44    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
45        let timeout = match config.values.get("timeout") {
46            Some(v) => {
47                let parsed: u64 = v.parse()?;
48                Duration::new(parsed, 0)
49            }
50            None => Duration::new(30, 0),
51        };
52
53        let redirect_policy = match config.values.get("max_redirects") {
54            Some(v) => {
55                let parsed: usize = v.parse()?;
56                reqwest::redirect::Policy::limited(parsed)
57            }
58            None => reqwest::redirect::Policy::default(),
59        };
60
61        self.clients.write().unwrap().insert(
62            config.module,
63            reqwest::Client::builder()
64                .timeout(timeout)
65                .redirect(redirect_policy)
66                .build()?,
67        );
68        Ok(vec![])
69    }
70
71    /// Clean up resources when a actor disconnects.
72    /// This removes the HTTP client associated with an actor.
73    fn deconfigure(
74        &self,
75        config: CapabilityConfiguration,
76    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
77        if self
78            .clients
79            .write()
80            .unwrap()
81            .remove(&config.module)
82            .is_none()
83        {
84            warn!(
85                "attempted to remove non-existent actor: {}",
86                config.module.as_str()
87            );
88        }
89
90        Ok(vec![])
91    }
92
93    /// Make a HTTP request.
94    fn request(
95        &self,
96        actor: &str,
97        msg: RequestArgs,
98    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
99        let lock = self.clients.read().unwrap();
100        let client = lock.get(actor).unwrap();
101        self.runtime
102            .block_on(async { http_client::request(&client, msg).await })
103    }
104}
105
106impl Default for HttpClientProvider {
107    fn default() -> Self {
108        let _ = env_logger::builder().format_module_path(false).try_init();
109
110        let r = tokio::runtime::Builder::new_multi_thread()
111            .enable_all()
112            .build()
113            .unwrap();
114
115        HttpClientProvider {
116            dispatcher: Arc::new(RwLock::new(Box::new(NullDispatcher::new()))),
117            clients: Arc::new(RwLock::new(HashMap::new())),
118            runtime: Arc::new(r),
119        }
120    }
121}
122
123/// Implements the CapabilityProvider interface.
124impl CapabilityProvider for HttpClientProvider {
125    fn configure_dispatch(
126        &self,
127        dispatcher: Box<dyn Dispatcher>,
128    ) -> Result<(), Box<dyn Error + Send + Sync>> {
129        info!("Dispatcher configured");
130
131        let mut lock = self.dispatcher.write().unwrap();
132        *lock = dispatcher;
133        Ok(())
134    }
135
136    /// Handle all calls from actors.
137    fn handle_call(
138        &self,
139        actor: &str,
140        op: &str,
141        msg: &[u8],
142    ) -> Result<Vec<u8>, Box<dyn Error + Send + Sync>> {
143        match op {
144            OP_BIND_ACTOR if actor == SYSTEM_ACTOR => self.configure(deserialize(msg)?),
145            OP_REMOVE_ACTOR if actor == SYSTEM_ACTOR => self.deconfigure(deserialize(msg)?),
146            OP_REQUEST => self.request(actor, deserialize(msg)?),
147            _ => Err(format!("Unknown operation: {}", op).into()),
148        }
149    }
150
151    fn stop(&self) {}
152}
153
154#[cfg(test)]
155mod tests {
156    use super::*;
157    use codec::deserialize;
158    use mockito::mock;
159    use wasmcloud_actor_http_client::{RequestArgs, Response};
160
161    #[test]
162    fn test_request() {
163        let _ = env_logger::try_init();
164        let request = RequestArgs {
165            method: "GET".to_string(),
166            url: mockito::server_url(),
167            headers: HashMap::new(),
168            body: vec![],
169        };
170
171        let _m = mock("GET", "/")
172            .with_header("content-type", "text/plain")
173            .with_body("ohai")
174            .create();
175
176        let hp = HttpClientProvider::new();
177        hp.configure(CapabilityConfiguration {
178            module: "test".to_string(),
179            values: HashMap::new(),
180        })
181        .unwrap();
182
183        let result = hp.request("test", request).unwrap();
184        let response: Response = deserialize(result.as_slice()).unwrap();
185
186        assert_eq!(response.status_code, 200);
187    }
188}