rmqtt_http_api/
lib.rs

1#![deny(unsafe_code)]
2
3use std::sync::Arc;
4
5use async_trait::async_trait;
6use tokio::{self, sync::oneshot, sync::RwLock};
7
8use rmqtt::{
9    context::ServerContext,
10    hook::{Register, Type},
11    macros::Plugin,
12    plugin::{PackageInfo, Plugin},
13    register, Result,
14};
15
16use config::PluginConfig;
17
18mod api;
19mod clients;
20mod config;
21mod handler;
22mod plugin;
23mod prome;
24mod subs;
25mod types;
26
27type ShutdownTX = oneshot::Sender<()>;
28type PluginConfigType = Arc<RwLock<PluginConfig>>;
29
30register!(HttpApiPlugin::new);
31
32#[derive(Plugin)]
33struct HttpApiPlugin {
34    scx: ServerContext,
35    register: Box<dyn Register>,
36    cfg: PluginConfigType,
37    shutdown_tx: Option<ShutdownTX>,
38}
39
40impl HttpApiPlugin {
41    #[inline]
42    async fn new<S: Into<String>>(scx: ServerContext, name: S) -> Result<Self> {
43        let name = name.into();
44        let cfg = scx.plugins.read_config_default::<PluginConfig>(&name);
45        log::info!("{} HttpApiPlugin cfg: {:?}", name, cfg);
46        let cfg = Arc::new(RwLock::new(cfg?));
47        let register = scx.extends.hook_mgr().register();
48        let shutdown_tx = Some(Self::start(scx.clone(), cfg.clone()).await);
49        Ok(Self { scx, register, cfg, shutdown_tx })
50    }
51
52    async fn start(scx: ServerContext, cfg: PluginConfigType) -> ShutdownTX {
53        let (shutdown_tx, shutdown_rx): (oneshot::Sender<()>, oneshot::Receiver<()>) = oneshot::channel();
54        let http_laddr = cfg.read().await.http_laddr;
55        tokio::spawn(async move {
56            if let Err(e) = api::listen_and_serve(scx, http_laddr, cfg, shutdown_rx).await {
57                log::error!("{e:?}");
58            }
59            log::info!("Exit HTTP API Server, ..., http://{http_laddr:?}");
60        });
61        shutdown_tx
62    }
63}
64
65#[async_trait]
66impl Plugin for HttpApiPlugin {
67    #[inline]
68    async fn init(&mut self) -> Result<()> {
69        log::info!("{} init", self.name());
70        let mgs_type = self.cfg.read().await.message_type;
71        self.register
72            .add(Type::GrpcMessageReceived, Box::new(handler::HookHandler::new(self.scx.clone(), mgs_type)))
73            .await;
74        Ok(())
75    }
76
77    #[inline]
78    async fn get_config(&self) -> Result<serde_json::Value> {
79        self.cfg.read().await.to_json()
80    }
81
82    #[inline]
83    async fn load_config(&mut self) -> Result<()> {
84        let new_cfg = self.scx.plugins.read_config::<PluginConfig>(self.name())?;
85        if !self.cfg.read().await.changed(&new_cfg) {
86            return Ok(());
87        }
88        let restart_enable = self.cfg.read().await.restart_enable(&new_cfg);
89        if restart_enable {
90            let new_cfg = Arc::new(RwLock::new(new_cfg));
91            if let Some(tx) = self.shutdown_tx.take() {
92                if let Err(e) = tx.send(()) {
93                    log::warn!("shutdown_tx send fail, {e:?}");
94                }
95            }
96            self.shutdown_tx = Some(Self::start(self.scx.clone(), new_cfg.clone()).await);
97            self.cfg = new_cfg;
98        } else {
99            *self.cfg.write().await = new_cfg;
100        }
101
102        log::debug!("load_config ok,  {:?}", self.cfg);
103        Ok(())
104    }
105
106    #[inline]
107    async fn start(&mut self) -> Result<()> {
108        log::info!("{} start", self.name());
109        self.register.start().await;
110        Ok(())
111    }
112
113    #[inline]
114    async fn stop(&mut self) -> Result<bool> {
115        log::info!("{} stop", self.name());
116        //self.register.stop().await;
117        Ok(false)
118    }
119}