1use crate::app::AppConfig;
2use crate::discover::{Registration, Registry};
3use anyhow;
4use log::{error, info};
5use std::collections::HashMap;
6use std::net::SocketAddr;
7use tokio::sync::oneshot;
8use uuid;
9use crate::discover::LogRegistry;
10use tonic::transport::Server;
11use std::future::Future;
12
13pub type DefaultApp = App<LogRegistry>;
14
15pub struct App<R>
17where
18 R: Registry,
19{
20 pub config: AppConfig,
22 register: Option<R>,
24}
25
26impl<R> App<R>
27where
28 R: Registry,
29{
30 pub fn new(id: &str, name: &str, version: &str) -> Self {
37 Self {
38 config: AppConfig {
39 id: id.to_string(),
40 name: name.to_string(),
41 version: version.to_string(),
42 ..Default::default()
43 },
44 register: None,
45 }
46 }
47
48 pub fn new_simple(name: &str) -> Self {
50 Self {
51 config: AppConfig {
52 name: name.to_string(),
53 ..Default::default()
54 },
55 register: None,
56 }
57 }
58
59 pub fn new_not_register(id: &str, name: &str, version: &str) -> Self {
61 let mut app = Self::new(id, name, version);
62 app.config.weight = 10;
63 app
64 }
65
66 pub fn new_simple_not_register(name: &str) -> Self {
68 let mut app = Self::new_simple(name);
69 app.config.weight = 10;
70 app
71 }
72
73 pub fn register(mut self, register: R) -> Self {
75 self.register = Some(register);
76 self
77 }
78
79
80 pub fn add_tag(&mut self, tag: &str) -> &mut Self {
82 self.config.tags.push(tag.to_string());
83 self
84 }
85
86 pub fn add_meta(&mut self, key: &str, value: &str) -> &mut Self {
88 self.config.metadata.insert(key.to_string(), value.to_string());
89 self
90 }
91
92 pub fn set_weight(&mut self, weight: u32) -> &mut Self {
94 self.config.weight = weight;
95 self
96 }
97
98 pub async fn run<F, Fut>(self, ip: &str, port: u16, server_fn: F) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
105 where
106 F: FnOnce(Server) -> Fut + Send + 'static,
107 Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send + 'static,
108 {
109 let registration = Registration::new(
111 self.config.name.clone(),
112 self.config.id.clone(),
113 self.config.tags,
114 ip.to_string(),
115 port,
116 self.config.weight,
117 self.config.metadata,
118 self.config.version,
119 );
120
121 let register = if let Some(r) = self.register {
122 r
123 } else {
124 return Err("No registry provided".into());
125 };
126
127 register_server(register.clone(), registration).await?;
129
130 let service_id = self.config.id.clone();
132 let register_clone = register.clone();
133 let heartbeat_handle = tokio::spawn(async move {
134 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
135 loop {
136 interval.tick().await;
137 if let Err(e) = register_clone.heartbeat(&service_id).await {
138 log::error!("Heartbeat failed: {}", e);
139 }
140 }
141 });
142
143 let addr: SocketAddr = format!("{}:{}", ip, port).parse()?;
145 let server = Server::builder();
146 info!("Starting server at: {}", addr);
147
148 let (tx, rx) = oneshot::channel();
150
151 let server_handle = tokio::spawn(server_fn(server));
153
154 tokio::spawn(async move {
156 let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
157 tokio::select! {
158 _ = tokio::signal::ctrl_c() => {},
159 _ = term.recv() => {},
160 }
161 let _ = tx.send(());
162 });
163
164 rx.await.unwrap();
166 info!("Shutting down gracefully...");
167
168 heartbeat_handle.abort();
170
171 deregister_server(register, self.config.id).await?;
173
174 server_handle.await??;
176 Ok(())
177 }
178}
179
180async fn register_server<R>(registry: R, reg: Registration) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
182where
183 R: Registry,
184{
185 registry.register(reg).await?;
186 Ok(())
187}
188
189async fn deregister_server<R>(registry: R, id: String) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
191where
192 R: Registry,
193{
194 match registry.deregister(id.as_str()).await {
195 Ok(_) => Ok(()),
196 Err(e) => {
197 error!("Failed to deregister service: {}", e);
198 Err(anyhow::anyhow!("Failed to deregister service: {}", e).into())
199 }
200 }
201}
202
203pub struct AppBuilder<R>
205where
206 R: Registry,
207{
208 id: Option<String>,
209 name: String,
210 version: Option<String>,
211 weight: u32,
212 tags: Vec<String>,
213 metadata: HashMap<String, String>,
214 register: Option<R>,
215}
216
217impl<R> AppBuilder<R>
218where
219 R: Registry,
220{
221 pub fn new(name: impl Into<String>) -> Self {
223 Self {
224 id: None,
225 name: name.into(),
226 version: None,
227 weight: 1,
228 tags: Vec::new(),
229 metadata: HashMap::new(),
230 register: None,
231 }
232 }
233
234 pub fn id(mut self, id: impl Into<String>) -> Self {
236 self.id = Some(id.into());
237 self
238 }
239
240 pub fn version(mut self, version: impl Into<String>) -> Self {
242 self.version = Some(version.into());
243 self
244 }
245
246 pub fn weight(mut self, weight: u32) -> Self {
248 self.weight = weight;
249 self
250 }
251
252 pub fn tag(mut self, tag: impl Into<String>) -> Self {
254 self.tags.push(tag.into());
255 self
256 }
257
258 pub fn meta(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
260 self.metadata.insert(key.into(), value.into());
261 self
262 }
263
264 pub fn register(mut self, register: R) -> Self {
266 self.register = Some(register);
267 self
268 }
269
270 pub fn build(self) -> App<R> {
272 App {
273 config: AppConfig {
274 id: self.id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
275 name: self.name,
276 version: self.version.unwrap_or_else(|| "0.1.0".to_string()),
277 weight: self.weight,
278 tags: self.tags,
279 metadata: self.metadata,
280 },
281 register: self.register,
282 }
283 }
284}
285
286impl<R> App<R>
287where
288 R: Registry,
289{
290 pub fn builder(name: impl Into<String>) -> AppBuilder<R> {
292 AppBuilder::new(name)
293 }
294}