1#![deny(unsafe_code)]
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use tokio::{self, sync::oneshot, sync::RwLock};
7
8use rmqtt::{
9 context::ServerContext,
10 hook::{Register, Type},
11 macros::Plugin,
12 plugin::{PackageInfo, Plugin},
13 register, Result,
14};
15
16use config::PluginConfig;
17
18mod api;
19mod clients;
20mod config;
21mod handler;
22mod plugin;
23mod prome;
24mod subs;
25mod types;
26
27type ShutdownTX = oneshot::Sender<()>;
28type PluginConfigType = Arc<RwLock<PluginConfig>>;
29
30register!(HttpApiPlugin::new);
31
32#[derive(Plugin)]
33struct HttpApiPlugin {
34 scx: ServerContext,
35 register: Box<dyn Register>,
36 cfg: PluginConfigType,
37 shutdown_tx: Option<ShutdownTX>,
38}
39
40impl HttpApiPlugin {
41 #[inline]
42 async fn new<S: Into<String>>(scx: ServerContext, name: S) -> Result<Self> {
43 let name = name.into();
44 let cfg = Arc::new(RwLock::new(scx.plugins.read_config::<PluginConfig>(&name)?));
45 log::debug!("{} HttpApiPlugin cfg: {:?}", name, cfg.read().await);
46 let register = scx.extends.hook_mgr().register();
47 let shutdown_tx = Some(Self::start(scx.clone(), cfg.clone()).await);
48 Ok(Self { scx, register, cfg, shutdown_tx })
49 }
50
51 async fn start(scx: ServerContext, cfg: PluginConfigType) -> ShutdownTX {
52 let (shutdown_tx, shutdown_rx): (oneshot::Sender<()>, oneshot::Receiver<()>) = oneshot::channel();
53 let workers = cfg.read().await.workers;
54 let http_laddr = cfg.read().await.http_laddr;
55 let _child = std::thread::Builder::new().name("http-api".to_string()).spawn(move || {
56 let cfg1 = cfg.clone();
57 let runner = async move {
58 let laddr = cfg1.read().await.http_laddr;
59 if let Err(e) = api::listen_and_serve(scx, laddr, cfg1, shutdown_rx).await {
60 log::error!("{e:?}");
61 }
62 };
63
64 let rt = tokio::runtime::Builder::new_multi_thread()
65 .enable_all()
66 .worker_threads(workers)
67 .thread_name("http-api-worker")
68 .thread_stack_size(4 * 1024 * 1024)
69 .build()
70 .expect("tokio runtime build failed");
71 rt.block_on(runner);
72 log::info!("Exit HTTP API Server, ..., http://{http_laddr:?}");
73 });
74 shutdown_tx
75 }
76}
77
78#[async_trait]
79impl Plugin for HttpApiPlugin {
80 #[inline]
81 async fn init(&mut self) -> Result<()> {
82 log::info!("{} init", self.name());
83 let mgs_type = self.cfg.read().await.message_type;
84 self.register
85 .add(Type::GrpcMessageReceived, Box::new(handler::HookHandler::new(self.scx.clone(), mgs_type)))
86 .await;
87 Ok(())
88 }
89
90 #[inline]
91 async fn get_config(&self) -> Result<serde_json::Value> {
92 self.cfg.read().await.to_json()
93 }
94
95 #[inline]
96 async fn load_config(&mut self) -> Result<()> {
97 let new_cfg = self.scx.plugins.read_config::<PluginConfig>(self.name())?;
98 if !self.cfg.read().await.changed(&new_cfg) {
99 return Ok(());
100 }
101 let restart_enable = self.cfg.read().await.restart_enable(&new_cfg);
102 if restart_enable {
103 let new_cfg = Arc::new(RwLock::new(new_cfg));
104 if let Some(tx) = self.shutdown_tx.take() {
105 if let Err(e) = tx.send(()) {
106 log::warn!("shutdown_tx send fail, {e:?}");
107 }
108 }
109 self.shutdown_tx = Some(Self::start(self.scx.clone(), new_cfg.clone()).await);
110 self.cfg = new_cfg;
111 } else {
112 *self.cfg.write().await = new_cfg;
113 }
114
115 log::debug!("load_config ok, {:?}", self.cfg);
116 Ok(())
117 }
118
119 #[inline]
120 async fn start(&mut self) -> Result<()> {
121 log::info!("{} start", self.name());
122 self.register.start().await;
123 Ok(())
124 }
125
126 #[inline]
127 async fn stop(&mut self) -> Result<bool> {
128 log::info!("{} stop", self.name());
129 Ok(false)
131 }
132}