1use std::{future::Future, net::SocketAddr, sync::Arc};
2
3use anyhow::{Context, Result};
4use nestforge_core::{
5 framework_log_event, initialize_module_runtime, Container, InitializedModule, ModuleDefinition,
6};
7use nestforge_microservices::{
8 EventEnvelope, MessageEnvelope, MicroserviceContext, MicroserviceRegistry, TransportMetadata,
9};
10use serde::Serialize;
11use tonic::Status;
12
13pub use prost;
14pub use tonic;
15
16#[derive(Debug, Clone)]
17pub struct GrpcServerConfig {
18 pub addr: String,
19}
20
21impl Default for GrpcServerConfig {
22 fn default() -> Self {
23 Self {
24 addr: "127.0.0.1:50051".to_string(),
25 }
26 }
27}
28
29impl GrpcServerConfig {
30 pub fn new(addr: impl Into<String>) -> Self {
31 Self { addr: addr.into() }
32 }
33
34 pub fn socket_addr(&self) -> Result<SocketAddr> {
35 self.addr
36 .parse()
37 .with_context(|| format!("Invalid gRPC listen address `{}`", self.addr))
38 }
39}
40
41#[derive(Clone)]
42pub struct GrpcContext {
43 container: Container,
44}
45
46impl GrpcContext {
47 pub fn new(container: Container) -> Self {
48 Self { container }
49 }
50
51 pub fn container(&self) -> &Container {
52 &self.container
53 }
54
55 pub fn resolve<T>(&self) -> Result<Arc<T>, Status>
56 where
57 T: Send + Sync + 'static,
58 {
59 self.container.resolve::<T>().map_err(|err| {
60 Status::internal(format!(
61 "Failed to resolve dependency `{}`: {}",
62 std::any::type_name::<T>(),
63 err
64 ))
65 })
66 }
67
68 pub fn microservice_context(
69 &self,
70 pattern: impl Into<String>,
71 metadata: TransportMetadata,
72 ) -> MicroserviceContext {
73 MicroserviceContext::new(self.container.clone(), "grpc", pattern, metadata)
74 }
75}
76
77pub async fn dispatch_grpc_message<Payload>(
78 ctx: &GrpcContext,
79 registry: &MicroserviceRegistry,
80 pattern: impl Into<String>,
81 payload: Payload,
82 metadata: TransportMetadata,
83) -> Result<serde_json::Value, Status>
84where
85 Payload: Serialize,
86{
87 let pattern = pattern.into();
88 let envelope =
89 MessageEnvelope::new(pattern.clone(), payload).map_err(map_microservice_error)?;
90 let envelope = envelope.with_metadata(metadata.clone());
91 let context = ctx.microservice_context(pattern, metadata);
92
93 registry
94 .dispatch_message(envelope, context)
95 .await
96 .map_err(map_microservice_error)
97}
98
99pub async fn dispatch_grpc_event<Payload>(
100 ctx: &GrpcContext,
101 registry: &MicroserviceRegistry,
102 pattern: impl Into<String>,
103 payload: Payload,
104 metadata: TransportMetadata,
105) -> Result<(), Status>
106where
107 Payload: Serialize,
108{
109 let pattern = pattern.into();
110 let envelope = EventEnvelope::new(pattern.clone(), payload).map_err(map_microservice_error)?;
111 let envelope = envelope.with_metadata(metadata.clone());
112 let context = ctx.microservice_context(pattern, metadata);
113
114 registry
115 .dispatch_event(envelope, context)
116 .await
117 .map_err(map_microservice_error)
118}
119
120fn map_microservice_error(err: anyhow::Error) -> Status {
121 Status::internal(err.to_string())
122}
123
124pub struct NestForgeGrpcFactory<M: ModuleDefinition> {
125 _marker: std::marker::PhantomData<M>,
126 container: Container,
127 runtime: Arc<InitializedModule>,
128 config: GrpcServerConfig,
129}
130
131impl<M: ModuleDefinition> NestForgeGrpcFactory<M> {
132 pub fn create() -> Result<Self> {
133 let container = Container::new();
134 let runtime = Arc::new(initialize_module_runtime::<M>(&container)?);
135 runtime.run_module_init(&container)?;
136 runtime.run_application_bootstrap(&container)?;
137
138 Ok(Self {
139 _marker: std::marker::PhantomData,
140 container,
141 runtime,
142 config: GrpcServerConfig::default(),
143 })
144 }
145
146 pub fn with_addr(mut self, addr: impl Into<String>) -> Self {
147 self.config = GrpcServerConfig::new(addr);
148 self
149 }
150
151 pub fn with_config(mut self, config: GrpcServerConfig) -> Self {
152 self.config = config;
153 self
154 }
155
156 pub fn container(&self) -> &Container {
157 &self.container
158 }
159
160 pub fn context(&self) -> GrpcContext {
161 GrpcContext::new(self.container.clone())
162 }
163
164 pub fn socket_addr(&self) -> Result<SocketAddr> {
165 self.config.socket_addr()
166 }
167
168 pub async fn listen_with<F, Fut, E>(self, serve: F) -> Result<()>
169 where
170 F: FnOnce(GrpcContext, SocketAddr) -> Fut,
171 Fut: Future<Output = std::result::Result<(), E>>,
172 E: std::error::Error + Send + Sync + 'static,
173 {
174 let runtime = Arc::clone(&self.runtime);
175 let container = self.container.clone();
176 let addr = self.socket_addr()?;
177 framework_log_event("grpc_server_listening", &[("addr", addr.to_string())]);
178 serve(self.context(), addr)
179 .await
180 .map_err(anyhow::Error::new)
181 .context("gRPC transport server failed")?;
182 runtime.run_module_destroy(&container)?;
183 runtime.run_application_shutdown(&container)?;
184 Ok(())
185 }
186}
187
188#[cfg(test)]
189mod tests {
190 use std::sync::{
191 atomic::{AtomicUsize, Ordering},
192 Arc,
193 };
194
195 use nestforge_microservices::MicroserviceRegistry;
196
197 use super::{dispatch_grpc_event, dispatch_grpc_message, GrpcContext, TransportMetadata};
198
199 #[derive(Clone)]
200 struct Counter(Arc<AtomicUsize>);
201
202 #[tokio::test]
203 async fn grpc_dispatch_adapter_invokes_message_registry() {
204 let container = nestforge_core::Container::new();
205 container
206 .register(Counter(Arc::new(AtomicUsize::new(3))))
207 .expect("counter should register");
208 let ctx = GrpcContext::new(container);
209 let registry = MicroserviceRegistry::builder()
210 .message("counter.read", |_payload: (), ctx| async move {
211 let counter = ctx.resolve::<Counter>()?;
212 Ok(counter.0.load(Ordering::Relaxed))
213 })
214 .build();
215
216 let response = dispatch_grpc_message(
217 &ctx,
218 ®istry,
219 "counter.read",
220 (),
221 TransportMetadata::new().insert("transport", "grpc"),
222 )
223 .await
224 .expect("message should dispatch");
225
226 assert_eq!(response, serde_json::json!(3));
227 }
228
229 #[tokio::test]
230 async fn grpc_dispatch_adapter_invokes_event_registry() {
231 let counter = Arc::new(AtomicUsize::new(0));
232 let ctx = GrpcContext::new(nestforge_core::Container::new());
233 let registry = MicroserviceRegistry::builder()
234 .event("counter.bump", {
235 let counter = Arc::clone(&counter);
236 move |_payload: (), _ctx| {
237 let counter = Arc::clone(&counter);
238 async move {
239 counter.fetch_add(1, Ordering::Relaxed);
240 Ok(())
241 }
242 }
243 })
244 .build();
245
246 dispatch_grpc_event(
247 &ctx,
248 ®istry,
249 "counter.bump",
250 (),
251 TransportMetadata::default(),
252 )
253 .await
254 .expect("event should dispatch");
255
256 assert_eq!(counter.load(Ordering::Relaxed), 1);
257 }
258}