1use reifydb_auth::service::{AuthResponse, AuthService};
11use reifydb_core::{
12 actors::server::{
13 ServerAuthResponse, ServerLogoutResponse, ServerMessage, ServerResponse, ServerSubscribeResponse,
14 },
15 execution::ExecutionResult,
16};
17use reifydb_engine::engine::StandardEngine;
18use reifydb_runtime::{
19 actor::{
20 context::Context,
21 reply::Reply,
22 traits::{Actor, Directive},
23 },
24 context::clock::Clock,
25};
26use reifydb_type::{params::Params, value::identity::IdentityId};
27
28pub struct ServerActor {
29 engine: StandardEngine,
30 auth_service: AuthService,
31 clock: Clock,
32}
33
34impl ServerActor {
35 pub fn new(engine: StandardEngine, auth_service: AuthService, clock: Clock) -> Self {
36 Self {
37 engine,
38 auth_service,
39 clock,
40 }
41 }
42
43 fn dispatch_execute(
44 &self,
45 identity: IdentityId,
46 statements: Vec<String>,
47 params: Params,
48 reply: Reply<ServerResponse>,
49 execute: impl FnOnce(&StandardEngine, IdentityId, &str, Params) -> ExecutionResult,
50 ) {
51 let combined = statements.join("; ");
52 let t = self.clock.instant();
53 let result = execute(&self.engine, identity, &combined, params);
54 if let Some(err) = result.error {
55 reply.send(ServerResponse::EngineError {
56 diagnostic: Box::new(err.diagnostic()),
57 statement: combined,
58 });
59 } else {
60 reply.send(ServerResponse::Success {
61 frames: result.frames,
62 duration: t.elapsed(),
63 });
64 }
65 }
66}
67
68impl Actor for ServerActor {
69 type State = ();
70 type Message = ServerMessage;
71
72 fn init(&self, _ctx: &Context<Self::Message>) -> Self::State {}
73
74 fn handle(&self, _state: &mut (), msg: ServerMessage, _ctx: &Context<ServerMessage>) -> Directive {
75 match msg {
76 ServerMessage::Query {
77 identity,
78 statements,
79 params,
80 reply,
81 } => {
82 self.dispatch_execute(identity, statements, params, reply, |e, id, s, p| {
83 e.query_as(id, s, p)
84 });
85 }
86 ServerMessage::Command {
87 identity,
88 statements,
89 params,
90 reply,
91 } => {
92 self.dispatch_execute(identity, statements, params, reply, |e, id, s, p| {
93 e.command_as(id, s, p)
94 });
95 }
96 ServerMessage::Admin {
97 identity,
98 statements,
99 params,
100 reply,
101 } => {
102 self.dispatch_execute(identity, statements, params, reply, |e, id, s, p| {
103 e.admin_as(id, s, p)
104 });
105 }
106 ServerMessage::Subscribe {
107 identity,
108 query,
109 reply,
110 } => {
111 let t = self.clock.instant();
112 let result = self.engine.subscribe_as(identity, &query, Params::None);
113 if let Some(err) = result.error {
114 reply.send(ServerSubscribeResponse::EngineError {
115 diagnostic: Box::new(err.diagnostic()),
116 statement: query,
117 });
118 } else {
119 reply.send(ServerSubscribeResponse::Subscribed {
120 frames: result.frames,
121 duration: t.elapsed(),
122 });
123 }
124 }
125 ServerMessage::Authenticate {
126 method,
127 credentials,
128 reply,
129 } => match self.auth_service.authenticate(&method, credentials) {
130 Ok(AuthResponse::Authenticated {
131 identity,
132 token,
133 }) => reply.send(ServerAuthResponse::Authenticated {
134 identity,
135 token,
136 }),
137 Ok(AuthResponse::Challenge {
138 challenge_id,
139 payload,
140 }) => reply.send(ServerAuthResponse::Challenge {
141 challenge_id,
142 payload,
143 }),
144 Ok(AuthResponse::Failed {
145 reason,
146 }) => reply.send(ServerAuthResponse::Failed {
147 reason,
148 }),
149 Err(e) => reply.send(ServerAuthResponse::Error(e.to_string())),
150 },
151 ServerMessage::Logout {
152 token,
153 reply,
154 } => {
155 if self.auth_service.revoke_token(&token) {
156 reply.send(ServerLogoutResponse::Ok);
157 } else {
158 reply.send(ServerLogoutResponse::InvalidToken);
159 }
160 }
161 }
162 Directive::Continue
163 }
164}