1use std::collections::HashMap;
2use std::sync::Arc;
3
4use anyhow::anyhow;
5use eventsource_stream::Eventsource;
6use flow_component::{BoxFuture, Component, ComponentError, LocalScope};
7use futures::{Stream, StreamExt, TryStreamExt};
8use reqwest::header::CONTENT_TYPE;
9use reqwest::{ClientBuilder, Method, Request, RequestBuilder};
10use serde_json::{Map, Value};
11use tracing::Span;
12use url::Url;
13use wick_config::config::components::{
14 ComponentConfig,
15 HttpClientComponentConfig,
16 HttpClientOperationDefinition,
17 OperationConfig,
18};
19use wick_config::config::{Codec, HttpMethod, LiquidJsonConfig, Metadata, UrlResource};
20use wick_config::{ConfigValidation, Resolver};
21use wick_interface_http::types::HttpEvent;
22use wick_interface_types::{ComponentSignature, OperationSignatures};
23use wick_packet::{
24 Base64Bytes,
25 FluxChannel,
26 Invocation,
27 Observer,
28 Packet,
29 PacketExt,
30 PacketSender,
31 PacketStream,
32 RuntimeConfig,
33};
34
35use crate::error::Error;
36static APP_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
37
38#[derive(Debug, Clone)]
39#[must_use]
40pub struct HttpClientComponent {
41 base: Url,
42 signature: ComponentSignature,
43 config: HttpClientComponentConfig,
44 root_config: Option<RuntimeConfig>,
45 path_templates: HashMap<String, Arc<(String, String)>>,
46 client: reqwest::Client,
47}
48
49impl HttpClientComponent {
50 #[allow(clippy::needless_pass_by_value)]
51 pub fn new(
52 config: HttpClientComponentConfig,
53 root_config: Option<RuntimeConfig>,
54 metadata: Option<Metadata>,
55 resolver: &Resolver,
56 ) -> Result<Self, ComponentError> {
57 validate(&config, resolver)?;
58 let addr: UrlResource = resolver(config.resource())
59 .and_then(|r| r.try_resource())
60 .and_then(|r| r.try_url())?;
61
62 let proxy_details = config.proxy().map(|proxy| {
63 let addr = resolver(proxy.resource())
64 .and_then(|r| r.try_resource())
65 .and_then(|r| r.try_url())
66 .ok();
67
68 let username = proxy.username();
69 let password = proxy.password();
70
71 (addr, username, password)
72 });
73
74 let (proxy_addr, proxy_username, proxy_password) = match proxy_details {
75 Some((addr, user, pass)) => (addr, Some(user).unwrap(), Some(pass).unwrap()),
76 None => (None, None, None),
77 };
78
79 let timeout = config.timeout().map_or_else(
80 || std::time::Duration::from_secs(5),
81 |timeout| std::time::Duration::from_secs(u64::from(timeout)),
82 );
83
84 let mut sig = ComponentSignature::new_named("wick/component/http");
85 sig.metadata.version = metadata.map(|v| v.version().to_owned());
86 sig.operations = config.operation_signatures();
87 sig.config = config.config().to_vec();
88
89 let url = addr
90 .url()
91 .value()
92 .cloned()
93 .ok_or_else(|| anyhow!("Internal Error - Invalid resource"))?;
94
95 let mut path_templates = HashMap::new();
96 for ops in config.operations() {
97 path_templates.insert(
98 ops.name().to_owned(),
99 Arc::new((ops.path().to_owned(), ops.path().to_owned())),
100 );
101 }
102
103 if proxy_addr == Some(addr.clone()) {
104 return Err(Error::ProxyLoop(addr.url().value().unwrap().clone()).into());
105 }
106
107 let client = match proxy_addr {
108 Some(proxy_addr) => {
109 let mut proxy = reqwest::Proxy::all(proxy_addr.url().value().unwrap().clone())?;
110
111 if let (Some(username), Some(password)) = (&proxy_username, &proxy_password) {
112 proxy = proxy.basic_auth(username.as_str(), password.as_str());
113 }
114
115 ClientBuilder::new()
116 .proxy(proxy)
117 .connect_timeout(timeout)
118 .user_agent(APP_USER_AGENT)
119 .build()?
120 }
121 None => ClientBuilder::new()
122 .connect_timeout(timeout)
123 .user_agent(APP_USER_AGENT)
124 .build()
125 .unwrap(),
126 };
127
128 Ok(Self {
129 signature: sig,
130 base: url,
131 path_templates,
132 client,
133 root_config,
134 config,
135 })
136 }
137}
138
139impl Component for HttpClientComponent {
140 fn handle(
141 &self,
142 invocation: Invocation,
143 op_config: Option<RuntimeConfig>,
144 _callback: LocalScope,
145 ) -> BoxFuture<Result<PacketStream, ComponentError>> {
146 let config = self.config.clone();
147 let baseurl = self.base.clone();
148 let codec = config.codec().copied();
149 let opdef = get_op_by_name(&config, invocation.target().operation_id());
150 let path_template = opdef
151 .as_ref()
152 .and_then(|op| self.path_templates.get(op.name()).cloned());
153 let client = self.client.clone();
154
155 Box::pin(async move {
156 let (tx, rx) = invocation.make_response();
157 let span = invocation.span().clone();
158 let fut = handle(
159 opdef,
160 tx.clone(),
161 invocation,
162 self.root_config.clone(),
163 op_config,
164 codec,
165 path_template,
166 baseurl,
167 client,
168 );
169 tokio::spawn(async move {
170 if let Err(e) = fut.await {
171 span.in_scope(|| error!(error = %e, "http:client"));
172 let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
173 }
174 });
175 Ok(rx)
176 })
177 }
178
179 fn signature(&self) -> &ComponentSignature {
180 &self.signature
181 }
182}
183
184fn get_op_by_name(config: &HttpClientComponentConfig, name: &str) -> Option<HttpClientOperationDefinition> {
185 config.operations().iter().find(|op| op.name() == name).cloned()
186}
187
188#[allow(clippy::too_many_arguments, clippy::too_many_lines)]
189async fn handle(
190 opdef: Option<HttpClientOperationDefinition>,
191 tx: FluxChannel<Packet, wick_packet::Error>,
192 invocation: Invocation,
193 root_config: Option<RuntimeConfig>,
194 op_config: Option<RuntimeConfig>,
195 codec: Option<Codec>,
196 path_template: Option<Arc<(String, String)>>,
197 baseurl: Url,
198 client: reqwest::Client,
199) -> anyhow::Result<()> {
200 if baseurl.cannot_be_a_base() {
201 return Err(Error::InvalidBaseUrl(baseurl).into());
202 }
203 let Some(opdef) = opdef else {
204 return Err(Error::OpNotFound(invocation.target().operation_id().to_owned()).into());
205 };
206 let codec = opdef.codec().copied().unwrap_or(codec.unwrap_or_default());
208 let template = path_template.unwrap();
209
210 let input_list: Vec<_> = opdef.inputs().iter().map(|i| i.name.clone()).collect();
211 let (invocation, stream) = invocation.split();
212 let mut inputs = wick_packet::StreamMap::from_stream(stream, input_list);
213 let mut handles = Vec::new();
214
215 'outer: loop {
216 let inputs = match inputs.next_set().await {
217 Ok(Some(inputs)) => inputs,
218 Ok(None) => break 'outer,
219 Err(e) => {
220 let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
221 break 'outer;
222 }
223 };
224
225 if inputs.values().all(|v| v.is_done()) {
226 break 'outer;
227 }
228 let inputs: Map<String, Value> = inputs
229 .into_iter()
230 .map(|(k, v)| {
231 let v = v
232 .decode_value()
233 .map_err(|e| {
234 invocation.trace(|| error!(port=%k,error=%e, "http:client"));
235 e
236 })
237 .unwrap_or(Value::Null);
238 (k, v)
239 })
240 .collect();
241 let inputs = Value::Object(inputs);
242
243 invocation.trace(|| trace!(inputs=?inputs, "http:client:inputs"));
244 let ctx = LiquidJsonConfig::make_context(
245 Some(inputs),
246 root_config.as_ref(),
247 op_config.as_ref(),
248 None,
249 Some(&invocation.inherent),
250 )?;
251
252 let body = match opdef.body() {
253 Some(body) => match body.render(&ctx) {
254 Ok(p) => Some(p),
255 Err(e) => {
256 let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
257 break 'outer;
258 }
259 },
260 None => None,
261 };
262
263 let append_path = match liquid_json::render_string(&template.0, &ctx)
264 .map_err(|e| Error::PathTemplate(template.1.clone(), e.to_string()))
265 {
266 Ok(p) => p,
267 Err(e) => {
268 let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
269 break 'outer;
270 }
271 };
272
273 let mut request_url = baseurl.clone();
274 let (path, query) = append_path.split_once('?').unwrap_or((&append_path, ""));
275 request_url.set_path(&format!("{}{}", request_url.path(), path));
276 request_url.set_query((!query.is_empty()).then_some(query));
277
278 invocation.trace(|| trace!(url= %request_url,body=?body, "http:client:request"));
279
280 let request = match opdef.method() {
281 HttpMethod::Get => Request::new(Method::GET, request_url),
282 HttpMethod::Post => Request::new(Method::POST, request_url),
283 HttpMethod::Put => Request::new(Method::PUT, request_url),
284 HttpMethod::Delete => Request::new(Method::DELETE, request_url),
285 };
286
287 let request_builder = RequestBuilder::from_parts(client.clone(), request);
288 let mut request_builder = if let Some(body) = body {
289 match codec {
290 Codec::Json => request_builder.json(&body),
291 Codec::Raw => {
292 unimplemented!("raw bodies not supported yet")
293 }
294 Codec::FormData => request_builder.form(&body),
295 Codec::Text => request_builder.body(body.to_string()),
296 }
297 } else {
298 request_builder
299 };
300
301 if let Some(headers) = opdef.headers() {
302 for (header, values) in headers {
303 for value in values {
304 let Ok(value) = liquid_json::render_string(value, &ctx) else {
305 let _ = tx.error(wick_packet::Error::component_error(format!(
306 "Can't render template {}",
307 value
308 )));
309 break 'outer;
310 };
311 request_builder = request_builder.header(header, value);
312 }
313 }
314 }
315
316 let (client, request) = request_builder.build_split();
317 let request = request.unwrap();
318
319 invocation.trace(|| debug!(request=?request, "http:client:request"));
320
321 let response = match client.execute(request).await {
322 Ok(r) => r,
323 Err(e) => {
324 let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
325 break 'outer;
326 }
327 };
328
329 invocation.trace(|| debug!(status=%response.status(), "http:client:response_status"));
330
331 let content_type = response.headers().get(CONTENT_TYPE);
332 let event_stream = content_type.map_or(false, |t| t == "text/event-stream");
333
334 let codec = content_type.map_or(Codec::Raw, |value| {
335 let value = value.to_str().unwrap();
336 let (value, _other) = value.split_once(';').unwrap_or((value, ""));
337 if value.starts_with("text/") {
338 if event_stream {
339 Codec::Json
340 } else {
341 Codec::Text
342 }
343 } else {
344 match value {
345 "application/json" => Codec::Json,
346 "application/x-www-form-urlencoded" => Codec::FormData,
347 "application/xhtml+xml" => Codec::Text,
348 _ => Codec::Raw,
349 }
350 }
351 });
352
353 let (our_response, body_stream) = match crate::conversions::to_wick_response(response) {
354 Ok(r) => r,
355 Err(e) => {
356 let _ = tx.error(wick_packet::Error::component_error(e.to_string()));
357 break 'outer;
358 }
359 };
360 invocation.trace(|| debug!(response = ?our_response, "http:client:response"));
361
362 let _ = tx.send(Packet::encode("response", our_response));
363 handles.push(tokio::spawn(output_task(
364 invocation.span.clone(),
365 codec,
366 body_stream,
367 event_stream,
368 tx.clone(),
369 )));
370 }
371 let _ = tx.send(Packet::done("response"));
372 let _ = futures::future::join_all(handles).await;
373 let _ = tx.send(Packet::done("body"));
374
375 Ok(())
376}
377
378fn output_task<T: Stream<Item = Result<Base64Bytes, reqwest::Error>> + Send + Unpin + 'static>(
379 span: Span,
380 codec: Codec,
381 body_stream: T,
382 event_stream: bool,
383 tx: PacketSender,
384) -> BoxFuture<'static, ()> {
385 let task = async move {
386 if let Err(e) = output_task_inner(span, codec, body_stream, event_stream, tx.clone()).await {
387 let _ = tx.send(Packet::err("body", e.to_string()));
388 }
389 };
390 Box::pin(task)
391}
392
393async fn output_task_inner<T: Stream<Item = Result<Base64Bytes, reqwest::Error>> + Send + Unpin + 'static>(
394 span: Span,
395 codec: Codec,
396 mut body_stream: T,
397 event_stream: bool,
398 tx: PacketSender,
399) -> Result<(), anyhow::Error> {
400 match codec {
401 Codec::Json => {
402 if event_stream {
403 let mut stream = body_stream.map(Into::into).eventsource();
404 while let Some(event) = stream.next().await {
405 if let Err(e) = event {
406 tx.send(Packet::err("body", e.to_string()))?;
407 break;
408 }
409 let event = event.unwrap();
410 let wick_event = HttpEvent {
411 event: event.event,
412 data: event.data,
413 id: event.id,
414 retry: event.retry.map(|d| d.as_millis() as _),
415 };
416
417 span.in_scope(|| debug!("{} {}", format!("{:?}", wick_event), "http:client:response_body:event"));
418 tx.send(Packet::encode("body", wick_event))?;
419 }
420 } else {
421 let bytes: Vec<Base64Bytes> = body_stream.try_collect().await?;
422 let bytes = bytes.concat();
423
424 let json: Value = serde_json::from_slice(&bytes)?;
425 span.in_scope(|| trace!(%json, "http:client:response_body"));
426 tx.send(Packet::encode("body", json))?;
427 }
428 }
429 Codec::Raw => {
430 tx.send(Packet::open_bracket("body"))?;
431 while let Some(bytes) = body_stream.next().await {
432 let bytes = bytes?;
433
434 span.in_scope(|| debug!(len = bytes.len(), "http:client:response_body"));
435
436 tx.send(Packet::encode("body", bytes))?;
437 }
438 tx.send(Packet::close_bracket("body"))?;
439 }
440 Codec::FormData => unreachable!("Form data on the response is not supported."),
441 Codec::Text => {
442 tx.send(Packet::open_bracket("body"))?;
443 while let Some(bytes) = body_stream.next().await {
444 let bytes = bytes?;
445 let text = String::from_utf8(bytes.into())?;
446 span.in_scope(|| debug!(%text, "http:client:response_body"));
447 tx.send(Packet::encode("body", text))?;
448 }
449 tx.send(Packet::close_bracket("body"))?;
450 }
451 }
452 Ok(())
453}
454
455impl ConfigValidation for HttpClientComponent {
456 type Config = HttpClientComponentConfig;
457
458 fn validate(config: &Self::Config, resolver: &Resolver) -> Result<(), ComponentError> {
459 Ok(validate(config, resolver)?)
460 }
461}
462
463fn validate(_config: &HttpClientComponentConfig, _resolver: &Resolver) -> Result<(), Error> {
464 Ok(())
465}
466
467#[cfg(test)]
468mod test {
469 use std::str::FromStr;
470
471 use anyhow::Result;
472 use futures::StreamExt;
473 use serde_json::json;
474 use wick_config::config::components::{
475 HttpClientComponentConfig,
476 HttpClientComponentConfigBuilder,
477 HttpClientOperationDefinition,
478 };
479 use wick_config::config::{AppConfiguration, ResourceDefinition};
480 use wick_interface_types::{Field, Type};
481 use wick_packet::{packet_stream, Entity};
482
483 use super::*;
484
485 #[test]
486 const fn test_component() {
487 const fn is_send_sync<T: Send + Sync>() {}
488 is_send_sync::<HttpClientComponent>();
489 }
490
491 static GET_OP: &str = "get";
492 static POST_OP: &str = "post";
493 static POST_OP_TEXT: &str = "post_text";
494
495 fn get_config() -> (AppConfiguration, HttpClientComponentConfig) {
496 let mut config = HttpClientComponentConfigBuilder::default()
497 .resource("base")
498 .codec(Codec::Json)
499 .build()
500 .unwrap();
501
502 let get_headers = Some(HashMap::from([(
503 "Authorization".to_owned(),
504 vec!["Bearer {{ctx.config.secret}}".to_owned()],
505 )]));
506
507 config.operations_mut().push(
508 HttpClientOperationDefinition::new_get(
509 GET_OP,
510 "get?query1={{input}}&query2={{ctx.config.secret}}",
511 vec![Field::new("input", Type::String)],
512 get_headers,
513 )
514 .config([Field::new("secret", Type::String)])
515 .build()
516 .unwrap(),
517 );
518
519 let post_headers = Some(HashMap::from([(
520 "X-Custom-Header".to_owned(),
521 vec!["{{input}}".to_owned()],
522 )]));
523
524 config.operations_mut().push(
525 HttpClientOperationDefinition::new_post(
526 POST_OP,
527 "post?query1={{input}}",
528 vec![
529 Field::new("input", Type::String),
530 Field::new(
531 "number",
532 Type::List {
533 ty: Box::new(Type::I64),
534 },
535 ),
536 ],
537 Some(json!({"key": "{{input}}","other":"{{number | each: '{\"value\": {{el}} }' | json | output }}"}).into()),
538 post_headers.clone(),
539 )
540 .build()
541 .unwrap(),
542 );
543
544 config.operations_mut().push(
545 HttpClientOperationDefinition::new_post(
546 POST_OP_TEXT,
547 "post?query1={{input}}",
548 vec![Field::new("input", Type::String), Field::new("payload", Type::String)],
549 Some(json!({"key": "{{input}}","other":"{{number | each: '{\"value\": {{el}} }' | json | output }}"}).into()),
550 post_headers,
551 )
552 .build()
553 .unwrap(),
554 );
555
556 let mut app_config = wick_config::config::AppConfiguration::default();
557 app_config.add_resource(
558 "base",
559 ResourceDefinition::Url(UrlResource::new(
560 Url::from_str(&format!("http://{}", std::env::var("HTTPBIN").unwrap())).unwrap(),
561 )),
562 );
563
564 (app_config, config)
565 }
566
567 fn get_component(app_config: &AppConfiguration, component_config: HttpClientComponentConfig) -> HttpClientComponent {
568 let resolver = app_config.resolver();
569 HttpClientComponent::new(component_config, None, None, &resolver).unwrap()
570 }
571
572 #[test_logger::test(test)]
573 fn test_validate() -> Result<()> {
574 let (app_config, component_config) = get_config();
575
576 let result = validate(&component_config, &app_config.resolver());
577 assert_eq!(result, Ok(()));
578 Ok(())
579 }
580
581 mod integration_test {
582 use serde_json::json;
583 use wick_interface_http::types::{HttpResponse, HttpVersion};
584
585 use super::*;
586
587 #[test_logger::test(tokio::test)]
588 async fn test_get_request() -> Result<()> {
589 let (app_config, component_config) = get_config();
590 let comp = get_component(&app_config, component_config);
591 let packets = packet_stream!(("input", "SENTINEL"));
592 let invocation = Invocation::test("test_get_request", Entity::local(GET_OP), packets, Default::default())?;
593 let config = json!({"secret":"0xDEADBEEF"});
594 let mut stream = comp
595 .handle(invocation, Some(RuntimeConfig::from_value(config)?), Default::default())
596 .await?
597 .collect::<Vec<_>>()
598 .await;
599
600 assert_eq!(stream.pop().unwrap(), Ok(Packet::done("body")));
601 let response = stream.pop().unwrap().unwrap().decode_value().unwrap();
602 let response_args = response.get("args").unwrap();
603 assert_eq!(response_args, &json!( {"query1": "SENTINEL","query2": "0xDEADBEEF"}));
604 let response_headers = response.get("headers").unwrap();
605 assert_eq!(
606 response_headers.get("Authorization").unwrap(),
607 &json!("Bearer 0xDEADBEEF")
608 );
609 assert_eq!(stream.pop().unwrap(), Ok(Packet::done("response")));
610 let response: HttpResponse = stream.pop().unwrap().unwrap().decode().unwrap();
611 assert_eq!(response.version, HttpVersion::Http11);
612
613 Ok(())
614 }
615
616 #[test_logger::test(tokio::test)]
617 async fn test_post_request() -> Result<()> {
618 let (app_config, component_config) = get_config();
619 let comp = get_component(&app_config, component_config);
620 let packets = packet_stream!(("input", "SENTINEL"), ("number", [123, 345, 678]));
621 let invocation = Invocation::test("test_post_request", Entity::local(POST_OP), packets, Default::default())?;
622 let stream = comp
623 .handle(invocation, Default::default(), Default::default())
624 .await?
625 .filter(|p| futures::future::ready(p.as_ref().map_or(false, |p| p.has_data())))
626 .collect::<Vec<_>>()
627 .await;
628
629 let packets = stream.into_iter().collect::<Result<Vec<_>, _>>()?;
630 for packet in packets {
631 if packet.port() == "body" {
632 println!("{:?}", packet);
633 let response = packet.decode_value().unwrap();
634 println!("as json: {:?}", response);
635
636 let args = response.get("args").unwrap();
637 assert_eq!(args, &json!( {"query1": "SENTINEL"}));
638 let data = response.get("json").unwrap();
639 assert_eq!(
640 data,
641 &json!( {"key": "SENTINEL","other":[{"value":123},{"value":345},{"value":678}]})
642 );
643 let response_headers = response.get("headers").unwrap();
644 assert_eq!(
645 response_headers.get("Content-Type").unwrap(),
646 &json!("application/json")
647 );
648 assert_eq!(response_headers.get("X-Custom-Header").unwrap(), &json!("SENTINEL"));
649 } else {
650 let response: HttpResponse = packet.decode().unwrap();
651 assert_eq!(response.version, HttpVersion::Http11);
652 }
653 }
654
655 Ok(())
656 }
657
658 #[test_logger::test(tokio::test)]
659 async fn test_text_post_request() -> Result<()> {
660 let (app_config, component_config) = get_config();
661 let comp = get_component(&app_config, component_config);
662 let packets = packet_stream!(("input", "SENTINEL"), ("payload", "<xml>FOOBAR</xml>"));
663 let invocation = Invocation::test(
664 "test_text_post_request",
665 Entity::local(POST_OP_TEXT),
666 packets,
667 Default::default(),
668 )?;
669 let stream = comp
670 .handle(invocation, Default::default(), Default::default())
671 .await?
672 .filter(|p| futures::future::ready(p.as_ref().map_or(false, |p| p.has_data())))
673 .collect::<Vec<_>>()
674 .await;
675
676 let packets = stream.into_iter().collect::<Result<Vec<_>, _>>()?;
677 for packet in packets {
678 if packet.port() == "body" {
679 println!("{:?}", packet);
680 let response = packet.decode_value().unwrap();
681 println!("as json: {:?}", response);
682
683 let args = response.get("args").unwrap();
684 assert_eq!(args, &json!( {"query1": "SENTINEL"}));
685 let data = response.get("data").unwrap().to_string();
686 assert_eq!(data, "<xml>FOOBAR</xml>");
687 let response_headers = response.get("headers").unwrap();
688 assert_eq!(
689 response_headers.get("Content-Type").unwrap(),
690 &json!("application/json")
691 );
692 assert_eq!(response_headers.get("X-Custom-Header").unwrap(), &json!("SENTINEL"));
693 } else {
694 let response: HttpResponse = packet.decode().unwrap();
695 assert_eq!(response.version, HttpVersion::Http11);
696 }
697 }
698
699 Ok(())
700 }
701 }
702}