reifydb_client/http/session/
channel.rs

1// Copyright (c) reifydb.com 2025
2// This file is licensed under the MIT
3
4use std::{sync::mpsc, time::Instant};
5
6use reifydb_type::{Error, diagnostic::internal};
7
8use crate::{
9	CommandRequest, Params, QueryRequest,
10	http::{
11		client::HttpClient,
12		message::{HttpInternalMessage, HttpResponseRoute},
13	},
14	utils::generate_request_id,
15};
16
17/// HTTP Channel response enum for different response types
18#[derive(Debug)]
19pub enum HttpChannelResponse {
20	/// Authentication response
21	Auth {
22		request_id: String,
23	},
24	/// Command execution response with frames
25	Command {
26		request_id: String,
27		result: crate::session::CommandResult,
28	},
29	/// Query execution response with frames
30	Query {
31		request_id: String,
32		result: crate::session::QueryResult,
33	},
34}
35
36/// HTTP Response message for channel sessions
37#[derive(Debug)]
38pub struct HttpResponseMessage {
39	pub request_id: String,
40	pub response: Result<HttpChannelResponse, Error>,
41	pub timestamp: Instant,
42}
43
44/// A channel-based HTTP session for message-passing style communication
45pub struct HttpChannelSession {
46	client: HttpClient,
47	token: Option<String>,
48	response_tx: mpsc::Sender<HttpResponseMessage>,
49}
50
51impl HttpChannelSession {
52	/// Create a new channel HTTP session
53	pub fn new(
54		host: &str,
55		port: u16,
56		token: Option<String>,
57	) -> Result<(Self, mpsc::Receiver<HttpResponseMessage>), Error> {
58		let client = HttpClient::new((host, port))
59			.map_err(|e| Error(internal(format!("Failed to create client: {}", e))))?;
60		Self::from_client(client, token)
61	}
62
63	/// Create a new channel HTTP session from an existing client
64	pub fn from_client(
65		client: HttpClient,
66		token: Option<String>,
67	) -> Result<(Self, mpsc::Receiver<HttpResponseMessage>), Error> {
68		let (tx, rx) = mpsc::channel();
69
70		let session = Self {
71			client,
72			token: token.clone(),
73			response_tx: tx,
74		};
75
76		// Authenticate if token provided
77		if token.is_some() {
78			let _ = session.authenticate();
79		}
80
81		Ok((session, rx))
82	}
83
84	/// Create from URL (e.g., "http://localhost:8080")
85	pub fn from_url(
86		url: &str,
87		token: Option<String>,
88	) -> Result<(Self, mpsc::Receiver<HttpResponseMessage>), Error> {
89		let client = HttpClient::from_url(url).map_err(|e| Error(internal(format!("Invalid URL: {}", e))))?;
90		Self::from_client(client, token)
91	}
92
93	/// Authenticate (response arrives on channel)
94	fn authenticate(&self) -> Result<String, Error> {
95		if self.token.is_none() {
96			return Ok(String::new());
97		}
98
99		let id = generate_request_id();
100
101		// Send auth message to worker thread
102		if let Err(e) = self.client.command_tx().send(HttpInternalMessage::Auth {
103			id: id.clone(),
104			_token: self.token.clone(),
105			route: HttpResponseRoute::Channel(self.response_tx.clone()),
106		}) {
107			return Err(Error(internal(format!("Failed to send auth request: {}", e))));
108		}
109
110		Ok(id)
111	}
112
113	/// Send a command (response arrives on channel)
114	pub fn command(&self, rql: &str, params: Option<Params>) -> Result<String, Box<dyn std::error::Error>> {
115		let id = generate_request_id();
116
117		let request = CommandRequest {
118			statements: vec![rql.to_string()],
119			params,
120		};
121
122		// Send command message to worker thread
123		if let Err(e) = self.client.command_tx().send(HttpInternalMessage::Command {
124			id: id.clone(),
125			request,
126			route: HttpResponseRoute::Channel(self.response_tx.clone()),
127		}) {
128			return Err(format!("Failed to send command request: {}", e).into());
129		}
130
131		Ok(id)
132	}
133
134	/// Send a query (response arrives on channel)
135	pub fn query(&self, rql: &str, params: Option<Params>) -> Result<String, Box<dyn std::error::Error>> {
136		let id = generate_request_id();
137
138		let request = QueryRequest {
139			statements: vec![rql.to_string()],
140			params,
141		};
142
143		// Send query message to worker thread
144		if let Err(e) = self.client.command_tx().send(HttpInternalMessage::Query {
145			id: id.clone(),
146			request,
147			route: HttpResponseRoute::Channel(self.response_tx.clone()),
148		}) {
149			return Err(format!("Failed to send query request: {}", e).into());
150		}
151
152		Ok(id)
153	}
154}