wick_http_client/
component.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::anyhow;
5use eventsource_stream::Eventsource;
6use flow_component::{BoxFuture, Component, ComponentError, LocalScope};
7use futures::{Stream, StreamExt, TryStreamExt};
8use reqwest::header::CONTENT_TYPE;
9use reqwest::{ClientBuilder, Method, Request, RequestBuilder};
10use serde_json::{Map, Value};
11use tracing::Span;
12use url::Url;
13use wick_config::config::components::{
14  ComponentConfig,
15  HttpClientComponentConfig,
16  HttpClientOperationDefinition,
17  OperationConfig,
18};
19use wick_config::config::{Codec, HttpMethod, LiquidJsonConfig, Metadata, UrlResource};
20use wick_config::{ConfigValidation, Resolver};
21use wick_interface_http::types::HttpEvent;
22use wick_interface_types::{ComponentSignature, OperationSignatures};
23use wick_packet::{
24  Base64Bytes,
25  FluxChannel,
26  Invocation,
27  Observer,
28  Packet,
29  PacketExt,
30  PacketSender,
31  PacketStream,
32  RuntimeConfig,
33};
34
35use crate::error::Error;
36static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
37
38#[derive(Debug, Clone)]
39#[must_use]
40pub struct HttpClientComponent {
41  base: Url,
42  signature: ComponentSignature,
43  config: HttpClientComponentConfig,
44  root_config: Option<RuntimeConfig>,
45  path_templates: HashMap<String, Arc<(String, String)>>,
46  client: reqwest::Client,
47}
48
49impl HttpClientComponent {
50  #[allow(clippy::needless_pass_by_value)]
51  pub fn new(
52    config: HttpClientComponentConfig,
53    root_config: Option<RuntimeConfig>,
54    metadata: Option<Metadata>,
55    resolver: &Resolver,
56  ) -> Result<Self, ComponentError> {
57    validate(&config, resolver)?;
58    let addr: UrlResource = resolver(config.resource())
59      .and_then(|r| r.try_resource())
60      .and_then(|r| r.try_url())?;
61
62    let proxy_details = config.proxy().map(|proxy| {
63      let addr = resolver(proxy.resource())
64        .and_then(|r| r.try_resource())
65        .and_then(|r| r.try_url())
66        .ok();
67
68      let username = proxy.username();
69      let password = proxy.password();
70
71      (addr, username, password)
72    });
73
74    let (proxy_addr, proxy_username, proxy_password) = match proxy_details {
75      Some((addr, user, pass)) => (addr, Some(user).unwrap(), Some(pass).unwrap()),
76      None => (None, None, None),
77    };
78
79    let timeout = config.timeout().map_or_else(
80      || std::time::Duration::from_secs(5),
81      |timeout| std::time::Duration::from_secs(u64::from(timeout)),
82    );
83
84    let mut sig = ComponentSignature::new_named("wick/component/http");
85    sig.metadata.version = metadata.map(|v| v.version().to_owned());
86    sig.operations = config.operation_signatures();
87    sig.config = config.config().to_vec();
88
89    let url = addr
90      .url()
91      .value()
92      .cloned()
93      .ok_or_else(|| anyhow!("Internal Error - Invalid resource"))?;
94
95    let mut path_templates = HashMap::new();
96    for ops in config.operations() {
97      path_templates.insert(
98        ops.name().to_owned(),
99        Arc::new((ops.path().to_owned(), ops.path().to_owned())),
100      );
101    }
102
103    if proxy_addr == Some(addr.clone()) {
104      return Err(Error::ProxyLoop(addr.url().value().unwrap().clone()).into());
105    }
106
107    let client = match proxy_addr {
108      Some(proxy_addr) => {
109        let mut proxy = reqwest::Proxy::all(proxy_addr.url().value().unwrap().clone())?;
110
111        if let (Some(username), Some(password)) = (&proxy_username, &proxy_password) {
112          proxy = proxy.basic_auth(username.as_str(), password.as_str());
113        }
114
115        ClientBuilder::new()
116          .proxy(proxy)
117          .connect_timeout(timeout)
118          .user_agent(APP_USER_AGENT)
119          .build()?
120      }
121      None => ClientBuilder::new()
122        .connect_timeout(timeout)
123        .user_agent(APP_USER_AGENT)
124        .build()
125        .unwrap(),
126    };
127
128    Ok(Self {
129      signature: sig,
130      base: url,
131      path_templates,
132      client,
133      root_config,
134      config,
135    })
136  }
137}
138
139impl Component for HttpClientComponent {
140  fn handle(
141    &self,
142    invocation: Invocation,
143    op_config: Option<RuntimeConfig>,
144    _callback: LocalScope,
145  ) -> BoxFuture<Result<PacketStream, ComponentError>> {
146    let config = self.config.clone();
147    let baseurl = self.base.clone();
148    let codec = config.codec().copied();
149    let opdef = get_op_by_name(&config, invocation.target().operation_id());
150    let path_template = opdef
151      .as_ref()
152      .and_then(|op| self.path_templates.get(op.name()).cloned());
153    let client = self.client.clone();
154
155    Box::pin(async move {
156      let (tx, rx) = invocation.make_response();
157      let span = invocation.span().clone();
158      let fut = handle(
159        opdef,
160        tx.clone(),
161        invocation,
162        self.root_config.clone(),
163        op_config,
164        codec,
165        path_template,
166        baseurl,
167        client,
168      );
169      tokio::spawn(async move {
170        if let Err(e) = fut.await {
171          span.in_scope(|| error!(error = %e, "http:client"));
172          let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
173        }
174      });
175      Ok(rx)
176    })
177  }
178
179  fn signature(&self) -> &ComponentSignature {
180    &self.signature
181  }
182}
183
184fn get_op_by_name(config: &HttpClientComponentConfig, name: &str) -> Option<HttpClientOperationDefinition> {
185  config.operations().iter().find(|op| op.name() == name).cloned()
186}
187
188#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
189async fn handle(
190  opdef: Option<HttpClientOperationDefinition>,
191  tx: FluxChannel<Packet, wick_packet::Error>,
192  invocation: Invocation,
193  root_config: Option<RuntimeConfig>,
194  op_config: Option<RuntimeConfig>,
195  codec: Option<Codec>,
196  path_template: Option<Arc<(String, String)>>,
197  baseurl: Url,
198  client: reqwest::Client,
199) -> anyhow::Result<()> {
200  if baseurl.cannot_be_a_base() {
201    return Err(Error::InvalidBaseUrl(baseurl).into());
202  }
203  let Some(opdef) = opdef else {
204    return Err(Error::OpNotFound(invocation.target().operation_id().to_owned()).into());
205  };
206  // Defer to operation codec, then to client codec, then to default.
207  let codec = opdef.codec().copied().unwrap_or(codec.unwrap_or_default());
208  let template = path_template.unwrap();
209
210  let input_list: Vec<_> = opdef.inputs().iter().map(|i| i.name.clone()).collect();
211  let (invocation, stream) = invocation.split();
212  let mut inputs = wick_packet::StreamMap::from_stream(stream, input_list);
213  let mut handles = Vec::new();
214
215  'outer: loop {
216    let inputs = match inputs.next_set().await {
217      Ok(Some(inputs)) => inputs,
218      Ok(None) => break 'outer,
219      Err(e) => {
220        let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
221        break 'outer;
222      }
223    };
224
225    if inputs.values().all(|v| v.is_done()) {
226      break 'outer;
227    }
228    let inputs: Map<String, Value> = inputs
229      .into_iter()
230      .map(|(k, v)| {
231        let v = v
232          .decode_value()
233          .map_err(|e| {
234            invocation.trace(|| error!(port=%k,error=%e, "http:client"));
235            e
236          })
237          .unwrap_or(Value::Null);
238        (k, v)
239      })
240      .collect();
241    let inputs = Value::Object(inputs);
242
243    invocation.trace(|| trace!(inputs=?inputs, "http:client:inputs"));
244    let ctx = LiquidJsonConfig::make_context(
245      Some(inputs),
246      root_config.as_ref(),
247      op_config.as_ref(),
248      None,
249      Some(&invocation.inherent),
250    )?;
251
252    let body = match opdef.body() {
253      Some(body) => match body.render(&ctx) {
254        Ok(p) => Some(p),
255        Err(e) => {
256          let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
257          break 'outer;
258        }
259      },
260      None => None,
261    };
262
263    let append_path = match liquid_json::render_string(&template.0, &ctx)
264      .map_err(|e| Error::PathTemplate(template.1.clone(), e.to_string()))
265    {
266      Ok(p) => p,
267      Err(e) => {
268        let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
269        break 'outer;
270      }
271    };
272
273    let mut request_url = baseurl.clone();
274    let (path, query) = append_path.split_once('?').unwrap_or((&append_path, ""));
275    request_url.set_path(&format!("{}{}", request_url.path(), path));
276    request_url.set_query((!query.is_empty()).then_some(query));
277
278    invocation.trace(|| trace!(url= %request_url,body=?body, "http:client:request"));
279
280    let request = match opdef.method() {
281      HttpMethod::Get => Request::new(Method::GET, request_url),
282      HttpMethod::Post => Request::new(Method::POST, request_url),
283      HttpMethod::Put => Request::new(Method::PUT, request_url),
284      HttpMethod::Delete => Request::new(Method::DELETE, request_url),
285    };
286
287    let request_builder = RequestBuilder::from_parts(client.clone(), request);
288    let mut request_builder = if let Some(body) = body {
289      match codec {
290        Codec::Json => request_builder.json(&body),
291        Codec::Raw => {
292          unimplemented!("raw bodies not supported yet")
293        }
294        Codec::FormData => request_builder.form(&body),
295        Codec::Text => request_builder.body(body.to_string()),
296      }
297    } else {
298      request_builder
299    };
300
301    if let Some(headers) = opdef.headers() {
302      for (header, values) in headers {
303        for value in values {
304          let Ok(value) = liquid_json::render_string(value, &ctx) else {
305            let _ = tx.error(wick_packet::Error::component_error(format!(
306              "Can't render template {}",
307              value
308            )));
309            break 'outer;
310          };
311          request_builder = request_builder.header(header, value);
312        }
313      }
314    }
315
316    let (client, request) = request_builder.build_split();
317    let request = request.unwrap();
318
319    invocation.trace(|| debug!(request=?request, "http:client:request"));
320
321    let response = match client.execute(request).await {
322      Ok(r) => r,
323      Err(e) => {
324        let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
325        break 'outer;
326      }
327    };
328
329    invocation.trace(|| debug!(status=%response.status(), "http:client:response_status"));
330
331    let content_type = response.headers().get(CONTENT_TYPE);
332    let event_stream = content_type.map_or(false, |t| t == "text/event-stream");
333
334    let codec = content_type.map_or(Codec::Raw, |value| {
335      let value = value.to_str().unwrap();
336      let (value, _other) = value.split_once(';').unwrap_or((value, ""));
337      if value.starts_with("text/") {
338        if event_stream {
339          Codec::Json
340        } else {
341          Codec::Text
342        }
343      } else {
344        match value {
345          "application/json" => Codec::Json,
346          "application/x-www-form-urlencoded" => Codec::FormData,
347          "application/xhtml+xml" => Codec::Text,
348          _ => Codec::Raw,
349        }
350      }
351    });
352
353    let (our_response, body_stream) = match crate::conversions::to_wick_response(response) {
354      Ok(r) => r,
355      Err(e) => {
356        let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
357        break 'outer;
358      }
359    };
360    invocation.trace(|| debug!(response = ?our_response, "http:client:response"));
361
362    let _ = tx.send(Packet::encode("response", our_response));
363    handles.push(tokio::spawn(output_task(
364      invocation.span.clone(),
365      codec,
366      body_stream,
367      event_stream,
368      tx.clone(),
369    )));
370  }
371  let _ = tx.send(Packet::done("response"));
372  let _ = futures::future::join_all(handles).await;
373  let _ = tx.send(Packet::done("body"));
374
375  Ok(())
376}
377
378fn output_task<T: Stream<Item = Result<Base64Bytes, reqwest::Error>> + Send + Unpin + 'static>(
379  span: Span,
380  codec: Codec,
381  body_stream: T,
382  event_stream: bool,
383  tx: PacketSender,
384) -> BoxFuture<'static, ()> {
385  let task = async move {
386    if let Err(e) = output_task_inner(span, codec, body_stream, event_stream, tx.clone()).await {
387      let _ = tx.send(Packet::err("body", e.to_string()));
388    }
389  };
390  Box::pin(task)
391}
392
393async fn output_task_inner<T: Stream<Item = Result<Base64Bytes, reqwest::Error>> + Send + Unpin + 'static>(
394  span: Span,
395  codec: Codec,
396  mut body_stream: T,
397  event_stream: bool,
398  tx: PacketSender,
399) -> Result<(), anyhow::Error> {
400  match codec {
401    Codec::Json => {
402      if event_stream {
403        let mut stream = body_stream.map(Into::into).eventsource();
404        while let Some(event) = stream.next().await {
405          if let Err(e) = event {
406            tx.send(Packet::err("body", e.to_string()))?;
407            break;
408          }
409          let event = event.unwrap();
410          let wick_event = HttpEvent {
411            event: event.event,
412            data: event.data,
413            id: event.id,
414            retry: event.retry.map(|d| d.as_millis() as _),
415          };
416
417          span.in_scope(|| debug!("{} {}", format!("{:?}", wick_event), "http:client:response_body:event"));
418          tx.send(Packet::encode("body", wick_event))?;
419        }
420      } else {
421        let bytes: Vec<Base64Bytes> = body_stream.try_collect().await?;
422        let bytes = bytes.concat();
423
424        let json: Value = serde_json::from_slice(&bytes)?;
425        span.in_scope(|| trace!(%json, "http:client:response_body"));
426        tx.send(Packet::encode("body", json))?;
427      }
428    }
429    Codec::Raw => {
430      tx.send(Packet::open_bracket("body"))?;
431      while let Some(bytes) = body_stream.next().await {
432        let bytes = bytes?;
433
434        span.in_scope(|| debug!(len = bytes.len(), "http:client:response_body"));
435
436        tx.send(Packet::encode("body", bytes))?;
437      }
438      tx.send(Packet::close_bracket("body"))?;
439    }
440    Codec::FormData => unreachable!("Form data on the response is not supported."),
441    Codec::Text => {
442      tx.send(Packet::open_bracket("body"))?;
443      while let Some(bytes) = body_stream.next().await {
444        let bytes = bytes?;
445        let text = String::from_utf8(bytes.into())?;
446        span.in_scope(|| debug!(%text, "http:client:response_body"));
447        tx.send(Packet::encode("body", text))?;
448      }
449      tx.send(Packet::close_bracket("body"))?;
450    }
451  }
452  Ok(())
453}
454
455impl ConfigValidation for HttpClientComponent {
456  type Config = HttpClientComponentConfig;
457
458  fn validate(config: &Self::Config, resolver: &Resolver) -> Result<(), ComponentError> {
459    Ok(validate(config, resolver)?)
460  }
461}
462
463fn validate(_config: &HttpClientComponentConfig, _resolver: &Resolver) -> Result<(), Error> {
464  Ok(())
465}
466
467#[cfg(test)]
468mod test {
469  use std::str::FromStr;
470
471  use anyhow::Result;
472  use futures::StreamExt;
473  use serde_json::json;
474  use wick_config::config::components::{
475    HttpClientComponentConfig,
476    HttpClientComponentConfigBuilder,
477    HttpClientOperationDefinition,
478  };
479  use wick_config::config::{AppConfiguration, ResourceDefinition};
480  use wick_interface_types::{Field, Type};
481  use wick_packet::{packet_stream, Entity};
482
483  use super::*;
484
485  #[test]
486  const fn test_component() {
487    const fn is_send_sync<T: Send + Sync>() {}
488    is_send_sync::<HttpClientComponent>();
489  }
490
491  static GET_OP: &str = "get";
492  static POST_OP: &str = "post";
493  static POST_OP_TEXT: &str = "post_text";
494
495  fn get_config() -> (AppConfiguration, HttpClientComponentConfig) {
496    let mut config = HttpClientComponentConfigBuilder::default()
497      .resource("base")
498      .codec(Codec::Json)
499      .build()
500      .unwrap();
501
502    let get_headers = Some(HashMap::from([(
503      "Authorization".to_owned(),
504      vec!["Bearer {{ctx.config.secret}}".to_owned()],
505    )]));
506
507    config.operations_mut().push(
508      HttpClientOperationDefinition::new_get(
509        GET_OP,
510        "get?query1={{input}}&query2={{ctx.config.secret}}",
511        vec![Field::new("input", Type::String)],
512        get_headers,
513      )
514      .config([Field::new("secret", Type::String)])
515      .build()
516      .unwrap(),
517    );
518
519    let post_headers = Some(HashMap::from([(
520      "X-Custom-Header".to_owned(),
521      vec!["{{input}}".to_owned()],
522    )]));
523
524    config.operations_mut().push(
525      HttpClientOperationDefinition::new_post(
526        POST_OP,
527        "post?query1={{input}}",
528        vec![
529          Field::new("input", Type::String),
530          Field::new(
531            "number",
532            Type::List {
533              ty: Box::new(Type::I64),
534            },
535          ),
536        ],
537        Some(json!({"key": "{{input}}","other":"{{number | each: '{\"value\": {{el}} }' | json | output }}"}).into()),
538        post_headers.clone(),
539      )
540      .build()
541      .unwrap(),
542    );
543
544    config.operations_mut().push(
545      HttpClientOperationDefinition::new_post(
546        POST_OP_TEXT,
547        "post?query1={{input}}",
548        vec![Field::new("input", Type::String), Field::new("payload", Type::String)],
549        Some(json!({"key": "{{input}}","other":"{{number | each: '{\"value\": {{el}} }' | json | output }}"}).into()),
550        post_headers,
551      )
552      .build()
553      .unwrap(),
554    );
555
556    let mut app_config = wick_config::config::AppConfiguration::default();
557    app_config.add_resource(
558      "base",
559      ResourceDefinition::Url(UrlResource::new(
560        Url::from_str(&format!("http://{}", std::env::var("HTTPBIN").unwrap())).unwrap(),
561      )),
562    );
563
564    (app_config, config)
565  }
566
567  fn get_component(app_config: &AppConfiguration, component_config: HttpClientComponentConfig) -> HttpClientComponent {
568    let resolver = app_config.resolver();
569    HttpClientComponent::new(component_config, None, None, &resolver).unwrap()
570  }
571
572  #[test_logger::test(test)]
573  fn test_validate() -> Result<()> {
574    let (app_config, component_config) = get_config();
575
576    let result = validate(&component_config, &app_config.resolver());
577    assert_eq!(result, Ok(()));
578    Ok(())
579  }
580
581  mod integration_test {
582    use serde_json::json;
583    use wick_interface_http::types::{HttpResponse, HttpVersion};
584
585    use super::*;
586
587    #[test_logger::test(tokio::test)]
588    async fn test_get_request() -> Result<()> {
589      let (app_config, component_config) = get_config();
590      let comp = get_component(&app_config, component_config);
591      let packets = packet_stream!(("input", "SENTINEL"));
592      let invocation = Invocation::test("test_get_request", Entity::local(GET_OP), packets, Default::default())?;
593      let config = json!({"secret":"0xDEADBEEF"});
594      let mut stream = comp
595        .handle(invocation, Some(RuntimeConfig::from_value(config)?), Default::default())
596        .await?
597        .collect::<Vec<_>>()
598        .await;
599
600      assert_eq!(stream.pop().unwrap(), Ok(Packet::done("body")));
601      let response = stream.pop().unwrap().unwrap().decode_value().unwrap();
602      let response_args = response.get("args").unwrap();
603      assert_eq!(response_args, &json!( {"query1": "SENTINEL","query2": "0xDEADBEEF"}));
604      let response_headers = response.get("headers").unwrap();
605      assert_eq!(
606        response_headers.get("Authorization").unwrap(),
607        &json!("Bearer 0xDEADBEEF")
608      );
609      assert_eq!(stream.pop().unwrap(), Ok(Packet::done("response")));
610      let response: HttpResponse = stream.pop().unwrap().unwrap().decode().unwrap();
611      assert_eq!(response.version, HttpVersion::Http11);
612
613      Ok(())
614    }
615
616    #[test_logger::test(tokio::test)]
617    async fn test_post_request() -> Result<()> {
618      let (app_config, component_config) = get_config();
619      let comp = get_component(&app_config, component_config);
620      let packets = packet_stream!(("input", "SENTINEL"), ("number", [123, 345, 678]));
621      let invocation = Invocation::test("test_post_request", Entity::local(POST_OP), packets, Default::default())?;
622      let stream = comp
623        .handle(invocation, Default::default(), Default::default())
624        .await?
625        .filter(|p| futures::future::ready(p.as_ref().map_or(false, |p| p.has_data())))
626        .collect::<Vec<_>>()
627        .await;
628
629      let packets = stream.into_iter().collect::<Result<Vec<_>, _>>()?;
630      for packet in packets {
631        if packet.port() == "body" {
632          println!("{:?}", packet);
633          let response = packet.decode_value().unwrap();
634          println!("as json: {:?}", response);
635
636          let args = response.get("args").unwrap();
637          assert_eq!(args, &json!( {"query1": "SENTINEL"}));
638          let data = response.get("json").unwrap();
639          assert_eq!(
640            data,
641            &json!( {"key": "SENTINEL","other":[{"value":123},{"value":345},{"value":678}]})
642          );
643          let response_headers = response.get("headers").unwrap();
644          assert_eq!(
645            response_headers.get("Content-Type").unwrap(),
646            &json!("application/json")
647          );
648          assert_eq!(response_headers.get("X-Custom-Header").unwrap(), &json!("SENTINEL"));
649        } else {
650          let response: HttpResponse = packet.decode().unwrap();
651          assert_eq!(response.version, HttpVersion::Http11);
652        }
653      }
654
655      Ok(())
656    }
657
658    #[test_logger::test(tokio::test)]
659    async fn test_text_post_request() -> Result<()> {
660      let (app_config, component_config) = get_config();
661      let comp = get_component(&app_config, component_config);
662      let packets = packet_stream!(("input", "SENTINEL"), ("payload", "<xml>FOOBAR</xml>"));
663      let invocation = Invocation::test(
664        "test_text_post_request",
665        Entity::local(POST_OP_TEXT),
666        packets,
667        Default::default(),
668      )?;
669      let stream = comp
670        .handle(invocation, Default::default(), Default::default())
671        .await?
672        .filter(|p| futures::future::ready(p.as_ref().map_or(false, |p| p.has_data())))
673        .collect::<Vec<_>>()
674        .await;
675
676      let packets = stream.into_iter().collect::<Result<Vec<_>, _>>()?;
677      for packet in packets {
678        if packet.port() == "body" {
679          println!("{:?}", packet);
680          let response = packet.decode_value().unwrap();
681          println!("as json: {:?}", response);
682
683          let args = response.get("args").unwrap();
684          assert_eq!(args, &json!( {"query1": "SENTINEL"}));
685          let data = response.get("data").unwrap().to_string();
686          assert_eq!(data, "<xml>FOOBAR</xml>");
687          let response_headers = response.get("headers").unwrap();
688          assert_eq!(
689            response_headers.get("Content-Type").unwrap(),
690            &json!("application/json")
691          );
692          assert_eq!(response_headers.get("X-Custom-Header").unwrap(), &json!("SENTINEL"));
693        } else {
694          let response: HttpResponse = packet.decode().unwrap();
695          assert_eq!(response.version, HttpVersion::Http11);
696        }
697      }
698
699      Ok(())
700    }
701  }
702}