greentic_runner_host/runner/
mod.rs1pub mod adapt_messaging;
2pub mod adapt_timer;
3pub mod adapt_webhook;
4pub mod engine;
5pub mod mocks;
6
7use std::net::SocketAddr;
8use std::num::NonZeroUsize;
9use std::sync::Arc;
10use std::time::Instant;
11
12use anyhow::{Context, Result, bail};
13use axum::http::StatusCode;
14use axum::routing::{any, post};
15use axum::{Router, serve};
16use lru::LruCache;
17use parking_lot::Mutex;
18use reqwest::Client;
19use tokio::net::TcpListener;
20use tokio::task::JoinHandle;
21
22use serde_json::Value;
23
24use crate::config::HostConfig;
25use crate::pack::PackRuntime;
26
27pub struct HostServer {
28 addr: SocketAddr,
29 router: Router,
30 config: Arc<HostConfig>,
31 engine: Arc<engine::FlowEngine>,
32 #[allow(dead_code)]
33 timer_handles: Vec<JoinHandle<()>>,
34}
35
36impl HostServer {
37 pub async fn new(config: Arc<HostConfig>, pack: Arc<PackRuntime>, port: u16) -> Result<Self> {
38 let addr = SocketAddr::from(([0, 0, 0, 0], port));
39 let http_client = Client::builder().build()?;
40 let engine = Arc::new(
41 engine::FlowEngine::new(Arc::clone(&pack), Arc::clone(&config))
42 .await
43 .context("failed to prime flow engine")?,
44 );
45 let telegram_capacity = NonZeroUsize::new(TELEGRAM_CACHE_CAPACITY)
46 .expect("telegram cache capacity must be > 0");
47 let webhook_capacity =
48 NonZeroUsize::new(WEBHOOK_CACHE_CAPACITY).expect("webhook cache capacity must be > 0");
49
50 let rate_limiter = RateLimiter::new(
51 config.rate_limits.messaging_send_qps,
52 config.rate_limits.messaging_burst,
53 );
54
55 let state = Arc::new(ServerState {
56 config: Arc::clone(&config),
57 engine: Arc::clone(&engine),
58 telegram_cache: Mutex::new(LruCache::new(telegram_capacity)),
59 webhook_cache: Mutex::new(LruCache::new(webhook_capacity)),
60 http_client,
61 messaging_rate: Mutex::new(rate_limiter),
62 });
63 let router = Router::new()
64 .route(
65 "/messaging/telegram/webhook",
66 post(adapt_messaging::telegram_webhook),
67 )
68 .route("/webhook/:flow_id", any(adapt_webhook::dispatch))
69 .with_state(Arc::clone(&state));
70
71 let timer_handles =
72 adapt_timer::spawn_timers(state).context("failed to spawn timer tasks")?;
73
74 Ok(Self {
75 addr,
76 router,
77 config,
78 engine,
79 timer_handles,
80 })
81 }
82
83 pub async fn serve(self) -> Result<()> {
84 tracing::info!(
85 addr = %self.addr,
86 tenant = %self.config.tenant,
87 flows = self.engine.flows().len(),
88 timers = self.timer_handles.len(),
89 "starting host server"
90 );
91 let listener = TcpListener::bind(self.addr).await?;
92 serve(listener, self.router.into_make_service()).await?;
93 Ok(())
94 }
95}
96
97pub struct ServerState {
98 pub config: Arc<HostConfig>,
99 pub engine: Arc<engine::FlowEngine>,
100 pub telegram_cache: Mutex<LruCache<i64, StatusCode>>,
101 pub webhook_cache: Mutex<LruCache<String, Value>>,
102 pub http_client: Client,
103 pub messaging_rate: Mutex<RateLimiter>,
104}
105
106impl ServerState {
107 pub fn get_secret(&self, key: &str) -> anyhow::Result<String> {
108 if !self.config.secrets_policy.is_allowed(key) {
109 bail!("secret {key} is not permitted by bindings policy");
110 }
111
112 if let Ok(value) = std::env::var(key) {
113 return Ok(value);
114 }
115
116 bail!("secret {key} not found in environment");
117 }
118}
119
120const TELEGRAM_CACHE_CAPACITY: usize = 1024;
121const WEBHOOK_CACHE_CAPACITY: usize = 256;
122
123pub struct RateLimiter {
124 allowance: f64,
125 rate: f64,
126 burst: f64,
127 last_check: Instant,
128}
129
130impl RateLimiter {
131 fn new(qps: u32, burst: u32) -> Self {
132 let rate = qps.max(1) as f64;
133 let burst = burst.max(1) as f64;
134 Self {
135 allowance: burst,
136 rate,
137 burst,
138 last_check: Instant::now(),
139 }
140 }
141
142 fn try_acquire(&mut self) -> bool {
143 let now = Instant::now();
144 let elapsed = now.duration_since(self.last_check).as_secs_f64();
145 self.last_check = now;
146 self.allowance += elapsed * self.rate;
147 if self.allowance > self.burst {
148 self.allowance = self.burst;
149 }
150 if self.allowance < 1.0 {
151 false
152 } else {
153 self.allowance -= 1.0;
154 true
155 }
156 }
157}
158
159#[cfg(test)]
160mod tests {
161 use super::*;
162 use std::thread::sleep;
163 use std::time::Duration;
164
165 #[test]
166 fn rate_limiter_allows_burst_and_refills() {
167 let mut limiter = RateLimiter::new(1, 2);
168 assert!(limiter.try_acquire());
169 assert!(limiter.try_acquire());
170 assert!(!limiter.try_acquire());
171 sleep(Duration::from_millis(1200));
172 assert!(limiter.try_acquire());
173 }
174}