reifydb_client/http/
worker.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the MIT
3
4use std::{
5	sync::mpsc,
6	time::{Duration, Instant},
7};
8
9use crate::{
10	http::{
11		client::HttpClientConfig,
12		message::{HttpInternalMessage, HttpResponseRoute},
13		session::{HttpChannelResponse, HttpResponseMessage},
14	},
15	session::{CommandResult, QueryResult, convert_command_response, convert_query_response},
16};
17
18/// HTTP worker thread that handles all requests for a client
19pub(crate) fn http_worker_thread(client: HttpClientConfig, command_rx: mpsc::Receiver<HttpInternalMessage>) {
20	// Process messages from the command channel with timeout to prevent
21	// hanging
22	loop {
23		match command_rx.recv_timeout(Duration::from_millis(100)) {
24			Ok(msg) => {
25				match msg {
26					HttpInternalMessage::Command {
27						id,
28						request,
29						route,
30					} => {
31						let timestamp = Instant::now();
32
33						// Send the HTTP request
34						let response = match client.send_command(&request) {
35							Ok(response) => Ok(HttpChannelResponse::Command {
36								request_id: id.clone(),
37								result: CommandResult {
38									frames: convert_command_response(response),
39								},
40							}),
41							Err(e) => Err(e),
42						};
43
44						// Route the response
45						match route {
46							HttpResponseRoute::Channel(tx) => {
47								let message = HttpResponseMessage {
48									request_id: id,
49									response,
50									timestamp,
51								};
52								let _ = tx.send(message);
53							}
54						}
55					}
56					HttpInternalMessage::Query {
57						id,
58						request,
59						route,
60					} => {
61						let timestamp = Instant::now();
62
63						// Send the HTTP request
64						let response = match client.send_query(&request) {
65							Ok(response) => Ok(HttpChannelResponse::Query {
66								request_id: id.clone(),
67								result: QueryResult {
68									frames: convert_query_response(response),
69								},
70							}),
71							Err(e) => Err(e),
72						};
73
74						// Route the response
75						match route {
76							HttpResponseRoute::Channel(tx) => {
77								let message = HttpResponseMessage {
78									request_id: id,
79									response,
80									timestamp,
81								};
82								let _ = tx.send(message);
83							}
84						}
85					}
86					HttpInternalMessage::Auth {
87						id,
88						_token: _,
89						route,
90					} => {
91						// For HTTP, authentication is
92						// stateless, so we
93						// just send a success response
94						// In a real implementation,
95						// this might send an auth
96						// request to /v1/auth
97						let timestamp = Instant::now();
98						let response = Ok(HttpChannelResponse::Auth {
99							request_id: id.clone(),
100						});
101
102						match route {
103							HttpResponseRoute::Channel(tx) => {
104								let message = HttpResponseMessage {
105									request_id: id,
106									response,
107									timestamp,
108								};
109								let _ = tx.send(message);
110							}
111						}
112					}
113					HttpInternalMessage::Close => {
114						break;
115					}
116				}
117			}
118			Err(mpsc::RecvTimeoutError::Timeout) => {
119				// Continue loop to check for messages
120				continue;
121			}
122			Err(mpsc::RecvTimeoutError::Disconnected) => {
123				// Channel disconnected, exit worker
124				break;
125			}
126		}
127	}
128}