rmqtt_http_api/
lib.rs

1#![deny(unsafe_code)]
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use tokio::{self, sync::oneshot, sync::RwLock};
7
8use rmqtt::{
9    context::ServerContext,
10    hook::{Register, Type},
11    macros::Plugin,
12    plugin::{PackageInfo, Plugin},
13    register, Result,
14};
15
16use config::PluginConfig;
17
18mod api;
19mod clients;
20mod config;
21mod handler;
22mod plugin;
23mod prome;
24mod subs;
25mod types;
26
27type ShutdownTX = oneshot::Sender<()>;
28type PluginConfigType = Arc<RwLock<PluginConfig>>;
29
30register!(HttpApiPlugin::new);
31
32#[derive(Plugin)]
33struct HttpApiPlugin {
34    scx: ServerContext,
35    register: Box<dyn Register>,
36    cfg: PluginConfigType,
37    shutdown_tx: Option<ShutdownTX>,
38}
39
40impl HttpApiPlugin {
41    #[inline]
42    async fn new<S: Into<String>>(scx: ServerContext, name: S) -> Result<Self> {
43        let name = name.into();
44        let cfg = Arc::new(RwLock::new(scx.plugins.read_config::<PluginConfig>(&name)?));
45        log::debug!("{} HttpApiPlugin cfg: {:?}", name, cfg.read().await);
46        let register = scx.extends.hook_mgr().register();
47        let shutdown_tx = Some(Self::start(scx.clone(), cfg.clone()).await);
48        Ok(Self { scx, register, cfg, shutdown_tx })
49    }
50
51    async fn start(scx: ServerContext, cfg: PluginConfigType) -> ShutdownTX {
52        let (shutdown_tx, shutdown_rx): (oneshot::Sender<()>, oneshot::Receiver<()>) = oneshot::channel();
53        let workers = cfg.read().await.workers;
54        let http_laddr = cfg.read().await.http_laddr;
55        let _child = std::thread::Builder::new().name("http-api".to_string()).spawn(move || {
56            let cfg1 = cfg.clone();
57            let runner = async move {
58                let laddr = cfg1.read().await.http_laddr;
59                if let Err(e) = api::listen_and_serve(scx, laddr, cfg1, shutdown_rx).await {
60                    log::error!("{e:?}");
61                }
62            };
63
64            let rt = tokio::runtime::Builder::new_multi_thread()
65                .enable_all()
66                .worker_threads(workers)
67                .thread_name("http-api-worker")
68                .thread_stack_size(4 * 1024 * 1024)
69                .build()
70                .expect("tokio runtime build failed");
71            rt.block_on(runner);
72            log::info!("Exit HTTP API Server, ..., http://{http_laddr:?}");
73        });
74        shutdown_tx
75    }
76}
77
78#[async_trait]
79impl Plugin for HttpApiPlugin {
80    #[inline]
81    async fn init(&mut self) -> Result<()> {
82        log::info!("{} init", self.name());
83        let mgs_type = self.cfg.read().await.message_type;
84        self.register
85            .add(Type::GrpcMessageReceived, Box::new(handler::HookHandler::new(self.scx.clone(), mgs_type)))
86            .await;
87        Ok(())
88    }
89
90    #[inline]
91    async fn get_config(&self) -> Result<serde_json::Value> {
92        self.cfg.read().await.to_json()
93    }
94
95    #[inline]
96    async fn load_config(&mut self) -> Result<()> {
97        let new_cfg = self.scx.plugins.read_config::<PluginConfig>(self.name())?;
98        if !self.cfg.read().await.changed(&new_cfg) {
99            return Ok(());
100        }
101        let restart_enable = self.cfg.read().await.restart_enable(&new_cfg);
102        if restart_enable {
103            let new_cfg = Arc::new(RwLock::new(new_cfg));
104            if let Some(tx) = self.shutdown_tx.take() {
105                if let Err(e) = tx.send(()) {
106                    log::warn!("shutdown_tx send fail, {e:?}");
107                }
108            }
109            self.shutdown_tx = Some(Self::start(self.scx.clone(), new_cfg.clone()).await);
110            self.cfg = new_cfg;
111        } else {
112            *self.cfg.write().await = new_cfg;
113        }
114
115        log::debug!("load_config ok,  {:?}", self.cfg);
116        Ok(())
117    }
118
119    #[inline]
120    async fn start(&mut self) -> Result<()> {
121        log::info!("{} start", self.name());
122        self.register.start().await;
123        Ok(())
124    }
125
126    #[inline]
127    async fn stop(&mut self) -> Result<bool> {
128        log::info!("{} stop", self.name());
129        //self.register.stop().await;
130        Ok(false)
131    }
132}