datex_native/com_interfaces/http/
http_client.rs1use datex_core::channel::mpsc::create_unbounded_channel;
2use datex_core::derive_setup_data;
3use datex_core::network::com_hub::errors::ComInterfaceCreateError;
4use datex_core::network::com_hub::managers::com_interface_manager::ComInterfaceAsyncFactoryResult;
5use datex_core::network::com_interfaces::com_interface::factory::{ComInterfaceAsyncFactory, ComInterfaceConfiguration, SendCallback, SendFailure, SocketConfiguration, SocketProperties};
6use datex_core::network::com_interfaces::com_interface::properties::{ComInterfaceProperties, InterfaceDirection};
7use datex_core::network::com_interfaces::default_setup_data::http::http_client::HTTPClientInterfaceSetupData;
8
9derive_setup_data!(HTTPClientInterfaceSetupDataNative, HTTPClientInterfaceSetupData);
10
11impl HTTPClientInterfaceSetupDataNative {
12 async fn create_interface(self) -> Result<ComInterfaceConfiguration, ComInterfaceCreateError> {
13
14 let (response_sender, mut response_receiver) = create_unbounded_channel::<Vec<u8>>();
15
16 Ok(ComInterfaceConfiguration::new_single_socket(
17 ComInterfaceProperties {
18 name: Some(self.url.clone()),
19 ..Self::get_default_properties()
20 },
21 SocketConfiguration::new_in_out(
22 SocketProperties::new(InterfaceDirection::InOut, 1),
23 async gen move {
24 while let Some(response_data) = response_receiver.next().await {
25 yield Ok(response_data);
26 }
27 },
28 SendCallback::new_async(move |block| {
29 let url = self.url.clone();
30 let mut response_sender = response_sender.clone();
31 async move {
32 let client = reqwest::Client::new();
33 let response = client.post(&url)
34 .body(block.to_bytes())
35 .send()
36 .await
37 .map_err(|e| {
38 println!("HTTP request error: {:#?}", e);
39 SendFailure(Box::new(block.clone()))
40 })?;
41 let status = response.status();
42 let bytes = response.bytes().await
43 .map_err(|e| {
44 println!("HTTP response read error: {:#?}", e);
45 SendFailure(Box::new(block.clone()))
46 })?;
47 response_sender.start_send(bytes.to_vec())
48 .unwrap();
49
50 if status.is_success() {
51 Ok(())
52 } else {
53 println!("HTTP request failed with status: {}", status);
54 Err(SendFailure(Box::new(block)))
55 }
56 }
57 })
58 )
59 ))
60 }
61}
62
63impl ComInterfaceAsyncFactory for HTTPClientInterfaceSetupDataNative {
64 fn create_interface(self) -> ComInterfaceAsyncFactoryResult {
65 Box::pin(self.create_interface())
66 }
67
68 fn get_default_properties() -> ComInterfaceProperties {
69 HTTPClientInterfaceSetupData::get_default_properties()
70 }
71}