Skip to main content

mockforge_registry_server/
fly_logs.rs

1//! Fly.io runtime log client for hosted-mock deployments.
2//!
3//! `GET /api/v1/hosted-mocks/{id}/logs` historically returned rows from the
4//! local `deployment_logs` table — deployment lifecycle events (created,
5//! deploying, deploy complete, errors), capped at 100. That endpoint stays as
6//! "events" for the UI's Events tab.
7//!
8//! This module adds the missing piece: actual runtime logs from the Fly
9//! machine running each hosted mock, surfaced via two new endpoints:
10//!
11//! - `GET /api/v1/hosted-mocks/{id}/runtime-logs` — REST pull, last N entries.
12//! - `GET /api/v1/hosted-mocks/{id}/runtime-logs/stream` — SSE that polls Fly
13//!   every couple of seconds and streams new entries to the browser.
14//!
15//! Configuration via environment variables (all optional — if `FLYIO_API_TOKEN`
16//! is unset the endpoints return an empty list and the SSE stream emits a
17//! "not configured" event then closes):
18//!
19//! - `FLYIO_API_TOKEN` — bearer token. Same one the orchestrator uses.
20//! - `FLY_LOGS_URL` — base URL of the Fly logs REST API. Defaults to
21//!   `https://api.fly.io/api/v1`.
22//! - `FLY_LOGS_TIMEOUT_MS` — per-request timeout (default 5000).
23//! - `FLY_LOGS_DEFAULT_LIMIT` — REST default page size (default 200).
24
25use chrono::{DateTime, Utc};
26use reqwest::Client;
27use serde::{Deserialize, Serialize};
28use std::sync::OnceLock;
29use std::time::Duration;
30use tracing::debug;
31
32/// One log line emitted by a Fly machine running a hosted mock.
33#[derive(Debug, Clone, Serialize, Deserialize)]
34pub struct LogEntry {
35    pub timestamp: DateTime<Utc>,
36    pub level: String,
37    pub message: String,
38    #[serde(skip_serializing_if = "Option::is_none")]
39    pub instance: Option<String>,
40    #[serde(skip_serializing_if = "Option::is_none")]
41    pub region: Option<String>,
42}
43
44/// Pull-based Fly logs client. Real-time NATS subscription is a follow-up
45/// (#232 Phase 6 looks at structured shipping from the container instead);
46/// polling buys us a usable runtime-logs view today.
47#[derive(Clone)]
48pub struct FlyLogsClient {
49    base_url: String,
50    token: String,
51    default_limit: u32,
52    http: Client,
53}
54
55impl FlyLogsClient {
56    /// Build a client from environment variables. Returns `None` when the Fly
57    /// API token isn't set — handlers degrade to empty / "not configured".
58    pub fn from_env() -> Option<Self> {
59        let token = std::env::var("FLYIO_API_TOKEN").ok()?;
60        let base_url = std::env::var("FLY_LOGS_URL")
61            .unwrap_or_else(|_| "https://api.fly.io/api/v1".to_string());
62        let timeout_ms = std::env::var("FLY_LOGS_TIMEOUT_MS")
63            .ok()
64            .and_then(|s| s.parse().ok())
65            .unwrap_or(5000);
66        let default_limit = std::env::var("FLY_LOGS_DEFAULT_LIMIT")
67            .ok()
68            .and_then(|s| s.parse().ok())
69            .unwrap_or(200);
70
71        let http = Client::builder().timeout(Duration::from_millis(timeout_ms)).build().ok()?;
72
73        Some(Self {
74            base_url: base_url.trim_end_matches('/').to_string(),
75            token,
76            default_limit,
77            http,
78        })
79    }
80
81    /// Fetch recent log entries for a Fly app.
82    ///
83    /// `since` filters to entries strictly newer than the given timestamp —
84    /// used by the SSE poll loop to avoid re-emitting lines.
85    /// `limit` overrides the env-configured default.
86    pub async fn fetch_recent(
87        &self,
88        app_name: &str,
89        since: Option<DateTime<Utc>>,
90        limit: Option<u32>,
91    ) -> Result<Vec<LogEntry>, FlyLogsError> {
92        let limit = limit.unwrap_or(self.default_limit);
93        let url = format!("{}/apps/{}/logs", self.base_url, app_name);
94        let mut req = self.http.get(&url).bearer_auth(&self.token).query(&[("limit", limit)]);
95        if let Some(ts) = since {
96            // Fly accepts an RFC3339 `since` parameter on its logs endpoint.
97            req = req.query(&[("since", ts.to_rfc3339())]);
98        }
99
100        debug!(app = %app_name, "Fetching Fly runtime logs");
101        let resp = req.send().await.map_err(FlyLogsError::Request)?;
102
103        if !resp.status().is_success() {
104            let status = resp.status();
105            let body = resp.text().await.unwrap_or_default();
106            return Err(FlyLogsError::Status {
107                status: status.as_u16(),
108                body,
109            });
110        }
111
112        let raw = resp.text().await.map_err(FlyLogsError::Request)?;
113        Ok(parse_log_payload(&raw))
114    }
115}
116
117/// Parse a Fly logs response. The API has shifted shape over the years; we try
118/// two common forms and fall back to NDJSON. Anything we can't parse is
119/// dropped silently — better an incomplete log view than a 500 on the
120/// admin UI.
121fn parse_log_payload(raw: &str) -> Vec<LogEntry> {
122    // Form 1: { "data": [ { ...attributes } ] } — the JSON:API-ish wrapping.
123    if let Ok(wrapped) = serde_json::from_str::<JsonApiWrapper>(raw) {
124        return wrapped.into_entries();
125    }
126    // Form 2: bare JSON array of entries.
127    if let Ok(entries) = serde_json::from_str::<Vec<RawLogLine>>(raw) {
128        return entries.into_iter().filter_map(RawLogLine::into_entry).collect();
129    }
130    // Form 3: NDJSON — one JSON object per line.
131    let mut out = Vec::new();
132    for line in raw.lines() {
133        let trimmed = line.trim();
134        if trimmed.is_empty() {
135            continue;
136        }
137        if let Ok(line_struct) = serde_json::from_str::<RawLogLine>(trimmed) {
138            if let Some(entry) = line_struct.into_entry() {
139                out.push(entry);
140            }
141        }
142    }
143    out
144}
145
146/// Process-wide lazily-initialised client.
147static GLOBAL: OnceLock<Option<FlyLogsClient>> = OnceLock::new();
148
149pub fn global() -> Option<&'static FlyLogsClient> {
150    GLOBAL.get_or_init(FlyLogsClient::from_env).as_ref()
151}
152
153#[derive(Debug, thiserror::Error)]
154pub enum FlyLogsError {
155    #[error("Fly logs request failed: {0}")]
156    Request(#[from] reqwest::Error),
157    #[error("Fly logs returned non-success status {status}: {body}")]
158    Status { status: u16, body: String },
159}
160
161#[derive(Debug, Deserialize)]
162struct JsonApiWrapper {
163    data: Vec<JsonApiResource>,
164}
165
166#[derive(Debug, Deserialize)]
167struct JsonApiResource {
168    #[serde(default)]
169    attributes: Option<RawLogLine>,
170}
171
172impl JsonApiWrapper {
173    fn into_entries(self) -> Vec<LogEntry> {
174        self.data
175            .into_iter()
176            .filter_map(|r| r.attributes.and_then(|a| a.into_entry()))
177            .collect()
178    }
179}
180
181#[derive(Debug, Deserialize)]
182struct RawLogLine {
183    #[serde(default)]
184    timestamp: Option<String>,
185    #[serde(default)]
186    level: Option<String>,
187    #[serde(default)]
188    message: Option<String>,
189    #[serde(default, alias = "instance_id")]
190    instance: Option<String>,
191    #[serde(default)]
192    region: Option<String>,
193}
194
195impl RawLogLine {
196    fn into_entry(self) -> Option<LogEntry> {
197        let message = self.message?;
198        let timestamp = self
199            .timestamp
200            .as_deref()
201            .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
202            .map(|d| d.with_timezone(&Utc))
203            .unwrap_or_else(Utc::now);
204        Some(LogEntry {
205            timestamp,
206            level: self.level.unwrap_or_else(|| "info".to_string()),
207            message,
208            instance: self.instance,
209            region: self.region,
210        })
211    }
212}
213
214#[cfg(test)]
215mod tests {
216    use super::*;
217
218    #[test]
219    fn parses_json_api_wrapped_form() {
220        let raw = r#"{
221            "data": [
222                {
223                    "attributes": {
224                        "timestamp": "2026-04-24T15:00:00Z",
225                        "level": "info",
226                        "message": "GET /users 200",
227                        "instance_id": "abc123",
228                        "region": "iad"
229                    }
230                }
231            ]
232        }"#;
233        let entries = parse_log_payload(raw);
234        assert_eq!(entries.len(), 1);
235        assert_eq!(entries[0].message, "GET /users 200");
236        assert_eq!(entries[0].instance.as_deref(), Some("abc123"));
237        assert_eq!(entries[0].region.as_deref(), Some("iad"));
238    }
239
240    #[test]
241    fn parses_bare_array_form() {
242        let raw = r#"[
243            { "timestamp": "2026-04-24T15:00:00Z", "level": "info", "message": "hi" },
244            { "timestamp": "2026-04-24T15:00:01Z", "level": "warn", "message": "ho" }
245        ]"#;
246        let entries = parse_log_payload(raw);
247        assert_eq!(entries.len(), 2);
248        assert_eq!(entries[1].level, "warn");
249    }
250
251    #[test]
252    fn parses_ndjson_form() {
253        let raw = r#"
254            {"timestamp":"2026-04-24T15:00:00Z","level":"info","message":"line1"}
255            {"timestamp":"2026-04-24T15:00:01Z","level":"error","message":"line2"}
256        "#;
257        let entries = parse_log_payload(raw);
258        assert_eq!(entries.len(), 2);
259        assert_eq!(entries[0].message, "line1");
260    }
261
262    #[test]
263    fn skips_lines_without_messages() {
264        let raw = r#"[ { "timestamp": "2026-04-24T15:00:00Z" } ]"#;
265        let entries = parse_log_payload(raw);
266        assert!(entries.is_empty());
267    }
268
269    #[test]
270    fn from_env_returns_none_without_token() {
271        std::env::remove_var("FLYIO_API_TOKEN");
272        assert!(FlyLogsClient::from_env().is_none());
273    }
274}