reifydb_client/http/
worker.rs1use std::{
5 sync::mpsc,
6 time::{Duration, Instant},
7};
8
9use crate::{
10 http::{
11 client::HttpClientConfig,
12 message::{HttpInternalMessage, HttpResponseRoute},
13 session::{HttpChannelResponse, HttpResponseMessage},
14 },
15 session::{CommandResult, QueryResult, convert_command_response, convert_query_response},
16};
17
18pub(crate) fn http_worker_thread(client: HttpClientConfig, command_rx: mpsc::Receiver<HttpInternalMessage>) {
20 loop {
23 match command_rx.recv_timeout(Duration::from_millis(100)) {
24 Ok(msg) => {
25 match msg {
26 HttpInternalMessage::Command {
27 id,
28 request,
29 route,
30 } => {
31 let timestamp = Instant::now();
32
33 let response = match client.send_command(&request) {
35 Ok(response) => Ok(HttpChannelResponse::Command {
36 request_id: id.clone(),
37 result: CommandResult {
38 frames: convert_command_response(response),
39 },
40 }),
41 Err(e) => Err(e),
42 };
43
44 match route {
46 HttpResponseRoute::Channel(tx) => {
47 let message = HttpResponseMessage {
48 request_id: id,
49 response,
50 timestamp,
51 };
52 let _ = tx.send(message);
53 }
54 }
55 }
56 HttpInternalMessage::Query {
57 id,
58 request,
59 route,
60 } => {
61 let timestamp = Instant::now();
62
63 let response = match client.send_query(&request) {
65 Ok(response) => Ok(HttpChannelResponse::Query {
66 request_id: id.clone(),
67 result: QueryResult {
68 frames: convert_query_response(response),
69 },
70 }),
71 Err(e) => Err(e),
72 };
73
74 match route {
76 HttpResponseRoute::Channel(tx) => {
77 let message = HttpResponseMessage {
78 request_id: id,
79 response,
80 timestamp,
81 };
82 let _ = tx.send(message);
83 }
84 }
85 }
86 HttpInternalMessage::Auth {
87 id,
88 _token: _,
89 route,
90 } => {
91 let timestamp = Instant::now();
98 let response = Ok(HttpChannelResponse::Auth {
99 request_id: id.clone(),
100 });
101
102 match route {
103 HttpResponseRoute::Channel(tx) => {
104 let message = HttpResponseMessage {
105 request_id: id,
106 response,
107 timestamp,
108 };
109 let _ = tx.send(message);
110 }
111 }
112 }
113 HttpInternalMessage::Close => {
114 break;
115 }
116 }
117 }
118 Err(mpsc::RecvTimeoutError::Timeout) => {
119 continue;
121 }
122 Err(mpsc::RecvTimeoutError::Disconnected) => {
123 break;
125 }
126 }
127 }
128}