1#![deny(unsafe_code)]
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use tokio::{self, sync::oneshot, sync::RwLock};
7
8use rmqtt::{
9 context::ServerContext,
10 hook::{Register, Type},
11 macros::Plugin,
12 plugin::{PackageInfo, Plugin},
13 register, Result,
14};
15
16use config::PluginConfig;
17
18mod api;
19mod clients;
20mod config;
21mod handler;
22mod plugin;
23mod prome;
24mod subs;
25mod types;
26
27type ShutdownTX = oneshot::Sender<()>;
28type PluginConfigType = Arc<RwLock<PluginConfig>>;
29
30register!(HttpApiPlugin::new);
31
32#[derive(Plugin)]
33struct HttpApiPlugin {
34 scx: ServerContext,
35 register: Box<dyn Register>,
36 cfg: PluginConfigType,
37 shutdown_tx: Option<ShutdownTX>,
38}
39
40impl HttpApiPlugin {
41 #[inline]
42 async fn new<S: Into<String>>(scx: ServerContext, name: S) -> Result<Self> {
43 let name = name.into();
44 let cfg = scx.plugins.read_config_default::<PluginConfig>(&name);
45 log::info!("{} HttpApiPlugin cfg: {:?}", name, cfg);
46 let cfg = Arc::new(RwLock::new(cfg?));
47 let register = scx.extends.hook_mgr().register();
48 let shutdown_tx = Some(Self::start(scx.clone(), cfg.clone()).await);
49 Ok(Self { scx, register, cfg, shutdown_tx })
50 }
51
52 async fn start(scx: ServerContext, cfg: PluginConfigType) -> ShutdownTX {
53 let (shutdown_tx, shutdown_rx): (oneshot::Sender<()>, oneshot::Receiver<()>) = oneshot::channel();
54 let http_laddr = cfg.read().await.http_laddr;
55 tokio::spawn(async move {
56 if let Err(e) = api::listen_and_serve(scx, http_laddr, cfg, shutdown_rx).await {
57 log::error!("{e:?}");
58 }
59 log::info!("Exit HTTP API Server, ..., http://{http_laddr:?}");
60 });
61 shutdown_tx
62 }
63}
64
65#[async_trait]
66impl Plugin for HttpApiPlugin {
67 #[inline]
68 async fn init(&mut self) -> Result<()> {
69 log::info!("{} init", self.name());
70 let mgs_type = self.cfg.read().await.message_type;
71 self.register
72 .add(Type::GrpcMessageReceived, Box::new(handler::HookHandler::new(self.scx.clone(), mgs_type)))
73 .await;
74 Ok(())
75 }
76
77 #[inline]
78 async fn get_config(&self) -> Result<serde_json::Value> {
79 self.cfg.read().await.to_json()
80 }
81
82 #[inline]
83 async fn load_config(&mut self) -> Result<()> {
84 let new_cfg = self.scx.plugins.read_config::<PluginConfig>(self.name())?;
85 if !self.cfg.read().await.changed(&new_cfg) {
86 return Ok(());
87 }
88 let restart_enable = self.cfg.read().await.restart_enable(&new_cfg);
89 if restart_enable {
90 let new_cfg = Arc::new(RwLock::new(new_cfg));
91 if let Some(tx) = self.shutdown_tx.take() {
92 if let Err(e) = tx.send(()) {
93 log::warn!("shutdown_tx send fail, {e:?}");
94 }
95 }
96 self.shutdown_tx = Some(Self::start(self.scx.clone(), new_cfg.clone()).await);
97 self.cfg = new_cfg;
98 } else {
99 *self.cfg.write().await = new_cfg;
100 }
101
102 log::debug!("load_config ok, {:?}", self.cfg);
103 Ok(())
104 }
105
106 #[inline]
107 async fn start(&mut self) -> Result<()> {
108 log::info!("{} start", self.name());
109 self.register.start().await;
110 Ok(())
111 }
112
113 #[inline]
114 async fn stop(&mut self) -> Result<bool> {
115 log::info!("{} stop", self.name());
116 Ok(false)
118 }
119}