Skip to main content

reifydb_sub_server/
actor.rs

1// SPDX-License-Identifier: Apache-2.0
2// Copyright (c) 2025 ReifyDB
3
4//! Unified server actor for HTTP, gRPC, and WebSocket transports.
5//!
6//! The same `handle()` code runs in both native (rayon pool) and DST modes.
7//! Protocol-specific concerns (serialization, HTTP status codes, etc.) live in
8//! the transport layer — this actor only does engine dispatch and auth.
9
10use reifydb_auth::service::{AuthResponse, AuthService};
11use reifydb_core::{
12	actors::server::{
13		ServerAuthResponse, ServerLogoutResponse, ServerMessage, ServerResponse, ServerSubscribeResponse,
14	},
15	execution::ExecutionResult,
16};
17use reifydb_engine::engine::StandardEngine;
18use reifydb_runtime::{
19	actor::{
20		context::Context,
21		reply::Reply,
22		traits::{Actor, Directive},
23	},
24	context::clock::Clock,
25};
26use reifydb_type::{params::Params, value::identity::IdentityId};
27
28pub struct ServerActor {
29	engine: StandardEngine,
30	auth_service: AuthService,
31	clock: Clock,
32}
33
34impl ServerActor {
35	pub fn new(engine: StandardEngine, auth_service: AuthService, clock: Clock) -> Self {
36		Self {
37			engine,
38			auth_service,
39			clock,
40		}
41	}
42
43	fn dispatch_execute(
44		&self,
45		identity: IdentityId,
46		statements: Vec<String>,
47		params: Params,
48		reply: Reply<ServerResponse>,
49		execute: impl FnOnce(&StandardEngine, IdentityId, &str, Params) -> ExecutionResult,
50	) {
51		let combined = statements.join("; ");
52		let t = self.clock.instant();
53		let result = execute(&self.engine, identity, &combined, params);
54		if let Some(err) = result.error {
55			reply.send(ServerResponse::EngineError {
56				diagnostic: Box::new(err.diagnostic()),
57				statement: combined,
58			});
59		} else {
60			reply.send(ServerResponse::Success {
61				frames: result.frames,
62				duration: t.elapsed(),
63			});
64		}
65	}
66}
67
68impl Actor for ServerActor {
69	type State = ();
70	type Message = ServerMessage;
71
72	fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {}
73
74	fn handle(&self, _state: &mut (), msg: ServerMessage, _ctx: &Context<ServerMessage>) -> Directive {
75		match msg {
76			ServerMessage::Query {
77				identity,
78				statements,
79				params,
80				reply,
81			} => {
82				self.dispatch_execute(identity, statements, params, reply, |e, id, s, p| {
83					e.query_as(id, s, p)
84				});
85			}
86			ServerMessage::Command {
87				identity,
88				statements,
89				params,
90				reply,
91			} => {
92				self.dispatch_execute(identity, statements, params, reply, |e, id, s, p| {
93					e.command_as(id, s, p)
94				});
95			}
96			ServerMessage::Admin {
97				identity,
98				statements,
99				params,
100				reply,
101			} => {
102				self.dispatch_execute(identity, statements, params, reply, |e, id, s, p| {
103					e.admin_as(id, s, p)
104				});
105			}
106			ServerMessage::Subscribe {
107				identity,
108				query,
109				reply,
110			} => {
111				let t = self.clock.instant();
112				let result = self.engine.subscribe_as(identity, &query, Params::None);
113				if let Some(err) = result.error {
114					reply.send(ServerSubscribeResponse::EngineError {
115						diagnostic: Box::new(err.diagnostic()),
116						statement: query,
117					});
118				} else {
119					reply.send(ServerSubscribeResponse::Subscribed {
120						frames: result.frames,
121						duration: t.elapsed(),
122					});
123				}
124			}
125			ServerMessage::Authenticate {
126				method,
127				credentials,
128				reply,
129			} => match self.auth_service.authenticate(&method, credentials) {
130				Ok(AuthResponse::Authenticated {
131					identity,
132					token,
133				}) => reply.send(ServerAuthResponse::Authenticated {
134					identity,
135					token,
136				}),
137				Ok(AuthResponse::Challenge {
138					challenge_id,
139					payload,
140				}) => reply.send(ServerAuthResponse::Challenge {
141					challenge_id,
142					payload,
143				}),
144				Ok(AuthResponse::Failed {
145					reason,
146				}) => reply.send(ServerAuthResponse::Failed {
147					reason,
148				}),
149				Err(e) => reply.send(ServerAuthResponse::Error(e.to_string())),
150			},
151			ServerMessage::Logout {
152				token,
153				reply,
154			} => {
155				if self.auth_service.revoke_token(&token) {
156					reply.send(ServerLogoutResponse::Ok);
157				} else {
158					reply.send(ServerLogoutResponse::InvalidToken);
159				}
160			}
161		}
162		Directive::Continue
163	}
164}