1use std::{future::Future, net::SocketAddr, sync::Arc};
2
3use anyhow::{Context, Result};
4use nestforge_core::{
5 framework_log_event, initialize_module_runtime, Container, InitializedModule, ModuleDefinition,
6};
7use nestforge_microservices::{
8 EventEnvelope, MessageEnvelope, MicroserviceContext, MicroserviceRegistry, TransportMetadata,
9};
10use serde::Serialize;
11use tonic::Status;
12
13pub use prost;
14pub use tonic;
15
16#[derive(Debug, Clone)]
17pub struct GrpcServerConfig {
18 pub addr: String,
19}
20
21impl Default for GrpcServerConfig {
22 fn default() -> Self {
23 Self {
24 addr: "127.0.0.1:50051".to_string(),
25 }
26 }
27}
28
29impl GrpcServerConfig {
30 pub fn new(addr: impl Into<String>) -> Self {
31 Self { addr: addr.into() }
32 }
33
34 pub fn socket_addr(&self) -> Result<SocketAddr> {
35 self.addr
36 .parse()
37 .with_context(|| format!("Invalid gRPC listen address `{}`", self.addr))
38 }
39}
40
41#[derive(Clone)]
42pub struct GrpcContext {
43 container: Container,
44}
45
46impl GrpcContext {
47 pub fn new(container: Container) -> Self {
48 Self { container }
49 }
50
51 pub fn container(&self) -> &Container {
52 &self.container
53 }
54
55 pub fn resolve<T>(&self) -> Result<Arc<T>, Status>
56 where
57 T: Send + Sync + 'static,
58 {
59 self.container.resolve::<T>().map_err(|err| {
60 Status::internal(format!(
61 "Failed to resolve dependency `{}`: {}",
62 std::any::type_name::<T>(),
63 err
64 ))
65 })
66 }
67
68 pub fn microservice_context(
69 &self,
70 pattern: impl Into<String>,
71 metadata: TransportMetadata,
72 ) -> MicroserviceContext {
73 MicroserviceContext::new(self.container.clone(), "grpc", pattern, metadata)
74 }
75}
76
77pub async fn dispatch_grpc_message<Payload>(
78 ctx: &GrpcContext,
79 registry: &MicroserviceRegistry,
80 pattern: impl Into<String>,
81 payload: Payload,
82 metadata: TransportMetadata,
83) -> Result<serde_json::Value, Status>
84where
85 Payload: Serialize,
86{
87 let pattern = pattern.into();
88 let envelope = MessageEnvelope::new(pattern.clone(), payload).map_err(map_microservice_error)?;
89 let envelope = envelope.with_metadata(metadata.clone());
90 let context = ctx.microservice_context(pattern, metadata);
91
92 registry
93 .dispatch_message(envelope, context)
94 .await
95 .map_err(map_microservice_error)
96}
97
98pub async fn dispatch_grpc_event<Payload>(
99 ctx: &GrpcContext,
100 registry: &MicroserviceRegistry,
101 pattern: impl Into<String>,
102 payload: Payload,
103 metadata: TransportMetadata,
104) -> Result<(), Status>
105where
106 Payload: Serialize,
107{
108 let pattern = pattern.into();
109 let envelope = EventEnvelope::new(pattern.clone(), payload).map_err(map_microservice_error)?;
110 let envelope = envelope.with_metadata(metadata.clone());
111 let context = ctx.microservice_context(pattern, metadata);
112
113 registry
114 .dispatch_event(envelope, context)
115 .await
116 .map_err(map_microservice_error)
117}
118
119fn map_microservice_error(err: anyhow::Error) -> Status {
120 Status::internal(err.to_string())
121}
122
123pub struct NestForgeGrpcFactory<M: ModuleDefinition> {
124 _marker: std::marker::PhantomData<M>,
125 container: Container,
126 runtime: Arc<InitializedModule>,
127 config: GrpcServerConfig,
128}
129
130impl<M: ModuleDefinition> NestForgeGrpcFactory<M> {
131 pub fn create() -> Result<Self> {
132 let container = Container::new();
133 let runtime = Arc::new(initialize_module_runtime::<M>(&container)?);
134 runtime.run_module_init(&container)?;
135 runtime.run_application_bootstrap(&container)?;
136
137 Ok(Self {
138 _marker: std::marker::PhantomData,
139 container,
140 runtime,
141 config: GrpcServerConfig::default(),
142 })
143 }
144
145 pub fn with_addr(mut self, addr: impl Into<String>) -> Self {
146 self.config = GrpcServerConfig::new(addr);
147 self
148 }
149
150 pub fn with_config(mut self, config: GrpcServerConfig) -> Self {
151 self.config = config;
152 self
153 }
154
155 pub fn container(&self) -> &Container {
156 &self.container
157 }
158
159 pub fn context(&self) -> GrpcContext {
160 GrpcContext::new(self.container.clone())
161 }
162
163 pub fn socket_addr(&self) -> Result<SocketAddr> {
164 self.config.socket_addr()
165 }
166
167 pub async fn listen_with<F, Fut, E>(self, serve: F) -> Result<()>
168 where
169 F: FnOnce(GrpcContext, SocketAddr) -> Fut,
170 Fut: Future<Output = std::result::Result<(), E>>,
171 E: std::error::Error + Send + Sync + 'static,
172 {
173 let runtime = Arc::clone(&self.runtime);
174 let container = self.container.clone();
175 let addr = self.socket_addr()?;
176 framework_log_event("grpc_server_listening", &[("addr", addr.to_string())]);
177 serve(self.context(), addr)
178 .await
179 .map_err(anyhow::Error::new)
180 .context("gRPC transport server failed")?;
181 runtime.run_module_destroy(&container)?;
182 runtime.run_application_shutdown(&container)?;
183 Ok(())
184 }
185}
186
187#[cfg(test)]
188mod tests {
189 use std::sync::{
190 atomic::{AtomicUsize, Ordering},
191 Arc,
192 };
193
194 use nestforge_microservices::MicroserviceRegistry;
195
196 use super::{dispatch_grpc_event, dispatch_grpc_message, GrpcContext, TransportMetadata};
197
198 #[derive(Clone)]
199 struct Counter(Arc<AtomicUsize>);
200
201 #[tokio::test]
202 async fn grpc_dispatch_adapter_invokes_message_registry() {
203 let container = nestforge_core::Container::new();
204 container
205 .register(Counter(Arc::new(AtomicUsize::new(3))))
206 .expect("counter should register");
207 let ctx = GrpcContext::new(container);
208 let registry = MicroserviceRegistry::builder()
209 .message("counter.read", |_payload: (), ctx| async move {
210 let counter = ctx.resolve::<Counter>()?;
211 Ok(counter.0.load(Ordering::Relaxed))
212 })
213 .build();
214
215 let response = dispatch_grpc_message(
216 &ctx,
217 ®istry,
218 "counter.read",
219 (),
220 TransportMetadata::new().insert("transport", "grpc"),
221 )
222 .await
223 .expect("message should dispatch");
224
225 assert_eq!(response, serde_json::json!(3));
226 }
227
228 #[tokio::test]
229 async fn grpc_dispatch_adapter_invokes_event_registry() {
230 let counter = Arc::new(AtomicUsize::new(0));
231 let ctx = GrpcContext::new(nestforge_core::Container::new());
232 let registry = MicroserviceRegistry::builder()
233 .event("counter.bump", {
234 let counter = Arc::clone(&counter);
235 move |_payload: (), _ctx| {
236 let counter = Arc::clone(&counter);
237 async move {
238 counter.fetch_add(1, Ordering::Relaxed);
239 Ok(())
240 }
241 }
242 })
243 .build();
244
245 dispatch_grpc_event(
246 &ctx,
247 ®istry,
248 "counter.bump",
249 (),
250 TransportMetadata::default(),
251 )
252 .await
253 .expect("event should dispatch");
254
255 assert_eq!(counter.load(Ordering::Relaxed), 1);
256 }
257}