flare_rpc_core/app/
app.rs

1use crate::app::AppConfig;
2use crate::discover::{Registration, Registry};
3use anyhow;
4use log::{error, info};
5use std::collections::HashMap;
6use std::net::SocketAddr;
7use tokio::sync::oneshot;
8use uuid;
9use crate::discover::LogRegistry;
10use tonic::transport::Server;
11use std::future::Future;
12
13pub type DefaultApp = App<LogRegistry>;
14
15/// RPC 应用程序
16pub struct App<R>
17where
18    R: Registry,
19{
20    /// 应用配置
21    pub config: AppConfig,
22    /// 服务注册器
23    register: Option<R>,
24}
25
26impl<R> App<R>
27where
28    R: Registry,
29{
30    /// 创建新的应用实例
31    ///
32    /// # Arguments
33    /// * `id` - 应用ID
34    /// * `name` - 应用名称
35    /// * `version` - 应用版本
36    pub fn new(id: &str, name: &str, version: &str) -> Self {
37        Self {
38            config: AppConfig {
39                id: id.to_string(),
40                name: name.to_string(),
41                version: version.to_string(),
42                ..Default::default()
43            },
44            register: None,
45        }
46    }
47
48    /// 创建简单应用实例(使用随机ID)
49    pub fn new_simple(name: &str) -> Self {
50        Self {
51            config: AppConfig {
52                name: name.to_string(),
53                ..Default::default()
54            },
55            register: None,
56        }
57    }
58
59    /// 创建不需要注册的应用实例
60    pub fn new_not_register(id: &str, name: &str, version: &str) -> Self {
61        let mut app = Self::new(id, name, version);
62        app.config.weight = 10;
63        app
64    }
65
66    /// 创建简单的不需要注册的应用实例
67    pub fn new_simple_not_register(name: &str) -> Self {
68        let mut app = Self::new_simple(name);
69        app.config.weight = 10;
70        app
71    }
72
73    /// 设置服务注册器
74    pub fn register(mut self, register: R) -> Self {
75        self.register = Some(register);
76        self
77    }
78    
79
80    /// 添加应用标签
81    pub fn add_tag(&mut self, tag: &str) -> &mut Self {
82        self.config.tags.push(tag.to_string());
83        self
84    }
85
86    /// 添加元数据
87    pub fn add_meta(&mut self, key: &str, value: &str) -> &mut Self {
88        self.config.metadata.insert(key.to_string(), value.to_string());
89        self
90    }
91
92    /// 设置服务权重
93    pub fn set_weight(&mut self, weight: u32) -> &mut Self {
94        self.config.weight = weight;
95        self
96    }
97
98    /// 运行服务器
99    ///
100    /// # Arguments
101    /// * `ip` - 监听IP地址
102    /// * `port` - 监听端口
103    /// * `server_fn` - 服务器函数
104    pub async fn run<F, Fut>(self, ip: &str, port: u16, server_fn: F) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
105    where
106        F: FnOnce(Server) -> Fut + Send + 'static,
107        Fut: Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>> + Send + 'static,
108    {
109        // 准备注册信息
110        let registration = Registration::new(
111            self.config.name.clone(),
112            self.config.id.clone(),
113            self.config.tags,
114            ip.to_string(),
115            port,
116            self.config.weight,
117            self.config.metadata,
118            self.config.version,
119        );
120        
121        let register = if let Some(r) = self.register {
122            r
123        } else {
124            return Err("No registry provided".into());
125        };
126        
127        // 处理服务注册
128        register_server(register.clone(), registration).await?;
129
130        // 启动心跳检查
131        let service_id = self.config.id.clone();
132        let register_clone = register.clone();
133        let heartbeat_handle = tokio::spawn(async move {
134            let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5));
135            loop {
136                interval.tick().await;
137                if let Err(e) = register_clone.heartbeat(&service_id).await {
138                    log::error!("Heartbeat failed: {}", e);
139                }
140            }
141        });
142
143        // 启动服务器
144        let addr: SocketAddr = format!("{}:{}", ip, port).parse()?;
145        let server = Server::builder();
146        info!("Starting server at: {}", addr);
147
148        // 创建关闭信号通道
149        let (tx, rx) = oneshot::channel();
150        
151        // 启动服务器
152        let server_handle = tokio::spawn(server_fn(server));
153
154        // 监听关闭信号
155        tokio::spawn(async move {
156            let mut term = tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()).unwrap();
157            tokio::select! {
158                _ = tokio::signal::ctrl_c() => {},
159                _ = term.recv() => {},
160            }
161            let _ = tx.send(());
162        });
163
164        // 等待关闭信号
165        rx.await.unwrap();
166        info!("Shutting down gracefully...");
167
168        // 停止心跳
169        heartbeat_handle.abort();
170
171        // 注销服务
172        deregister_server(register, self.config.id).await?;
173
174        // 等待服务器关闭
175        server_handle.await??;
176        Ok(())
177    }
178}
179
180/// 注册服务
181async fn register_server<R>(registry: R, reg: Registration) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
182where
183    R: Registry,
184{
185    registry.register(reg).await?;
186    Ok(())
187}
188
189/// 注销服务
190async fn deregister_server<R>(registry: R, id: String) -> Result<(), Box<dyn std::error::Error + Send + Sync>>
191where
192    R: Registry,
193{
194    match registry.deregister(id.as_str()).await {
195        Ok(_) => Ok(()),
196        Err(e) => {
197            error!("Failed to deregister service: {}", e);
198            Err(anyhow::anyhow!("Failed to deregister service: {}", e).into())
199        }
200    }
201}
202
203/// App Builder
204pub struct AppBuilder<R>
205where
206    R: Registry,
207{
208    id: Option<String>,
209    name: String,
210    version: Option<String>,
211    weight: u32,
212    tags: Vec<String>,
213    metadata: HashMap<String, String>,
214    register: Option<R>,
215}
216
217impl<R> AppBuilder<R>
218where
219    R: Registry,
220{
221    /// 创建新的 Builder
222    pub fn new(name: impl Into<String>) -> Self {
223        Self {
224            id: None,
225            name: name.into(),
226            version: None,
227            weight: 1,
228            tags: Vec::new(),
229            metadata: HashMap::new(),
230            register: None,
231        }
232    }
233
234    /// 设置应用 ID
235    pub fn id(mut self, id: impl Into<String>) -> Self {
236        self.id = Some(id.into());
237        self
238    }
239
240    /// 设置版本
241    pub fn version(mut self, version: impl Into<String>) -> Self {
242        self.version = Some(version.into());
243        self
244    }
245
246    /// 设置权重
247    pub fn weight(mut self, weight: u32) -> Self {
248        self.weight = weight;
249        self
250    }
251
252    /// 添加标签
253    pub fn tag(mut self, tag: impl Into<String>) -> Self {
254        self.tags.push(tag.into());
255        self
256    }
257
258    /// 添加元数据
259    pub fn meta(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
260        self.metadata.insert(key.into(), value.into());
261        self
262    }
263
264    /// 设置注册器
265    pub fn register(mut self, register: R) -> Self {
266        self.register = Some(register);
267        self
268    }
269
270    /// 构建 App 实例
271    pub fn build(self) -> App<R> {
272        App {
273            config: AppConfig {
274                id: self.id.unwrap_or_else(|| uuid::Uuid::new_v4().to_string()),
275                name: self.name,
276                version: self.version.unwrap_or_else(|| "0.1.0".to_string()),
277                weight: self.weight,
278                tags: self.tags,
279                metadata: self.metadata,
280            },
281            register: self.register,
282        }
283    }
284}
285
286impl<R> App<R>
287where
288    R: Registry,
289{
290    /// 创建新的 Builder
291    pub fn builder(name: impl Into<String>) -> AppBuilder<R> {
292        AppBuilder::new(name)
293    }
294}