1use crate::errors::Result;
16use crate::mailbox::Message;
17use crate::process::Process;
18use crate::registry::ProcessRegistry;
19use erltf::OwnedTerm;
20use erltf::types::{Atom, ExternalPid, ExternalReference};
21use std::future::Future;
22use std::sync::Arc;
23
24pub enum CallResult {
25 Reply(OwnedTerm),
26 NoReply,
27}
28
29pub trait GenServer: Send + 'static {
30 fn init(&mut self, args: Vec<OwnedTerm>) -> impl Future<Output = Result<()>> + Send + '_;
31
32 fn handle_call(
33 &mut self,
34 msg: OwnedTerm,
35 from: ExternalPid,
36 ) -> impl Future<Output = Result<CallResult>> + Send + '_;
37
38 fn handle_cast(&mut self, msg: OwnedTerm) -> impl Future<Output = Result<()>> + Send + '_;
39
40 fn handle_info(&mut self, msg: OwnedTerm) -> impl Future<Output = Result<()>> + Send + '_;
41
42 fn terminate(&mut self, _reason: OwnedTerm) -> impl Future<Output = ()> + Send + '_ {
43 async move {}
44 }
45}
46
47pub struct GenServerProcess<T: GenServer> {
48 server: T,
49 call_tag: Atom,
50 cast_tag: Atom,
51 registry: Arc<ProcessRegistry>,
52}
53
54impl<T: GenServer> GenServerProcess<T> {
55 pub fn new(server: T, registry: Arc<ProcessRegistry>) -> Self {
56 Self {
57 server,
58 call_tag: Atom::new("$gen_call"),
59 cast_tag: Atom::new("$gen_cast"),
60 registry,
61 }
62 }
63
64 async fn handle_gen_call(
65 &mut self,
66 from_pid: ExternalPid,
67 reference: ExternalReference,
68 request: OwnedTerm,
69 ) -> Result<()> {
70 let result = self.server.handle_call(request, from_pid.clone()).await?;
71
72 match result {
73 CallResult::Reply(reply) => {
74 let reply_msg = OwnedTerm::Tuple(vec![OwnedTerm::Reference(reference), reply]);
75
76 if let Some(handle) = self.registry.get(&from_pid).await {
77 handle
78 .send(Message::Regular {
79 from: None,
80 body: reply_msg,
81 })
82 .await?;
83 }
84
85 Ok(())
86 }
87 CallResult::NoReply => Ok(()),
88 }
89 }
90
91 async fn handle_gen_cast(&mut self, request: OwnedTerm) -> Result<()> {
92 self.server.handle_cast(request).await
93 }
94}
95
96impl<T: GenServer> Process for GenServerProcess<T> {
97 async fn handle_message(&mut self, msg: Message) -> Result<()> {
98 match msg {
99 Message::Regular { from: _, body } => {
100 if let OwnedTerm::Tuple(elements) = &body
101 && elements.len() >= 2
102 && let OwnedTerm::Atom(tag) = &elements[0]
103 {
104 if tag == &self.call_tag && elements.len() == 3 {
105 if let OwnedTerm::Tuple(from_tuple) = &elements[1]
106 && from_tuple.len() == 2
107 && let OwnedTerm::Pid(from_pid) = &from_tuple[0]
108 && let OwnedTerm::Reference(reference) = &from_tuple[1]
109 {
110 let request = elements[2].clone();
111 return self
112 .handle_gen_call(from_pid.clone(), reference.clone(), request)
113 .await;
114 }
115 } else if tag == &self.cast_tag && elements.len() == 2 {
116 return self.handle_gen_cast(elements[1].clone()).await;
117 }
118 }
119
120 self.server.handle_info(body).await
121 }
122 Message::Control { .. } => Ok(()),
123 Message::Exit { reason, .. } => {
124 self.server.terminate(reason).await;
125 Ok(())
126 }
127 _ => Ok(()),
128 }
129 }
130
131 async fn terminate(&mut self) {
132 self.server
133 .terminate(OwnedTerm::Atom(Atom::new("normal")))
134 .await;
135 }
136}