greentic_runner_host/runner/
mod.rs

1pub mod adapt_messaging;
2pub mod adapt_timer;
3pub mod adapt_webhook;
4pub mod engine;
5pub mod mocks;
6
7use std::net::SocketAddr;
8use std::num::NonZeroUsize;
9use std::sync::Arc;
10use std::time::Instant;
11
12use anyhow::{Context, Result, bail};
13use axum::http::StatusCode;
14use axum::routing::{any, post};
15use axum::{Router, serve};
16use lru::LruCache;
17use parking_lot::Mutex;
18use reqwest::Client;
19use tokio::net::TcpListener;
20use tokio::task::JoinHandle;
21
22use serde_json::Value;
23
24use crate::config::HostConfig;
25use crate::pack::PackRuntime;
26
27pub struct HostServer {
28    addr: SocketAddr,
29    router: Router,
30    config: Arc<HostConfig>,
31    engine: Arc<engine::FlowEngine>,
32    #[allow(dead_code)]
33    timer_handles: Vec<JoinHandle<()>>,
34}
35
36impl HostServer {
37    pub async fn new(config: Arc<HostConfig>, pack: Arc<PackRuntime>, port: u16) -> Result<Self> {
38        let addr = SocketAddr::from(([0, 0, 0, 0], port));
39        let http_client = Client::builder().build()?;
40        let engine = Arc::new(
41            engine::FlowEngine::new(Arc::clone(&pack), Arc::clone(&config))
42                .await
43                .context("failed to prime flow engine")?,
44        );
45        let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
46            .expect("telegram cache capacity must be > 0");
47        let webhook_capacity =
48            NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
49
50        let rate_limiter = RateLimiter::new(
51            config.rate_limits.messaging_send_qps,
52            config.rate_limits.messaging_burst,
53        );
54
55        let state = Arc::new(ServerState {
56            config: Arc::clone(&config),
57            engine: Arc::clone(&engine),
58            telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
59            webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
60            http_client,
61            messaging_rate: Mutex::new(rate_limiter),
62        });
63        let router = Router::new()
64            .route(
65                "/messaging/telegram/webhook",
66                post(adapt_messaging::telegram_webhook),
67            )
68            .route("/webhook/:flow_id", any(adapt_webhook::dispatch))
69            .with_state(Arc::clone(&state));
70
71        let timer_handles =
72            adapt_timer::spawn_timers(state).context("failed to spawn timer tasks")?;
73
74        Ok(Self {
75            addr,
76            router,
77            config,
78            engine,
79            timer_handles,
80        })
81    }
82
83    pub async fn serve(self) -> Result<()> {
84        tracing::info!(
85            addr = %self.addr,
86            tenant = %self.config.tenant,
87            flows = self.engine.flows().len(),
88            timers = self.timer_handles.len(),
89            "starting host server"
90        );
91        let listener = TcpListener::bind(self.addr).await?;
92        serve(listener, self.router.into_make_service()).await?;
93        Ok(())
94    }
95}
96
97pub struct ServerState {
98    pub config: Arc<HostConfig>,
99    pub engine: Arc<engine::FlowEngine>,
100    pub telegram_cache: Mutex<LruCache<i64, StatusCode>>,
101    pub webhook_cache: Mutex<LruCache<String, Value>>,
102    pub http_client: Client,
103    pub messaging_rate: Mutex<RateLimiter>,
104}
105
106impl ServerState {
107    pub fn get_secret(&self, key: &str) -> anyhow::Result<String> {
108        if !self.config.secrets_policy.is_allowed(key) {
109            bail!("secret {key} is not permitted by bindings policy");
110        }
111
112        if let Ok(value) = std::env::var(key) {
113            return Ok(value);
114        }
115
116        bail!("secret {key} not found in environment");
117    }
118}
119
120const TELEGRAM_CACHE_CAPACITY: usize = 1024;
121const WEBHOOK_CACHE_CAPACITY: usize = 256;
122
123pub struct RateLimiter {
124    allowance: f64,
125    rate: f64,
126    burst: f64,
127    last_check: Instant,
128}
129
130impl RateLimiter {
131    fn new(qps: u32, burst: u32) -> Self {
132        let rate = qps.max(1) as f64;
133        let burst = burst.max(1) as f64;
134        Self {
135            allowance: burst,
136            rate,
137            burst,
138            last_check: Instant::now(),
139        }
140    }
141
142    fn try_acquire(&mut self) -> bool {
143        let now = Instant::now();
144        let elapsed = now.duration_since(self.last_check).as_secs_f64();
145        self.last_check = now;
146        self.allowance += elapsed * self.rate;
147        if self.allowance > self.burst {
148            self.allowance = self.burst;
149        }
150        if self.allowance < 1.0 {
151            false
152        } else {
153            self.allowance -= 1.0;
154            true
155        }
156    }
157}
158
159#[cfg(test)]
160mod tests {
161    use super::*;
162    use std::thread::sleep;
163    use std::time::Duration;
164
165    #[test]
166    fn rate_limiter_allows_burst_and_refills() {
167        let mut limiter = RateLimiter::new(1, 2);
168        assert!(limiter.try_acquire());
169        assert!(limiter.try_acquire());
170        assert!(!limiter.try_acquire());
171        sleep(Duration::from_millis(1200));
172        assert!(limiter.try_acquire());
173    }
174}