noetl-server 2.3.0

NoETL Control Plane - Async Rust server for workflow orchestration
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
//! HTTP handlers for the `/api/internal/*` route family.
//!
//! Mirror of the Python implementation in
//! `repos/noetl/noetl/server/api/internal/` (noetl/noetl v4.10.0).
//! Tracks noetl/server#11 → noetl/ai-meta#49 Phase C.
//!
//! All routes are gated by `RequireInternalApiToken` — a bearer-token
//! extractor that pulls the expected token from the
//! `NOETL_INTERNAL_API_TOKEN` env var and constant-time-compares it to
//! the request's `Authorization: Bearer <token>` header.  System
//! worker pool playbooks carry the token via their K8s ServiceAccount
//! Secret; user playbooks don't have it and get 403.

use std::env;

use axum::{
    extract::{FromRequestParts, State},
    http::{request::Parts, StatusCode},
    Json,
};
use serde::{Deserialize, Serialize};
use subtle::ConstantTimeEq;
use tracing::{debug, info, warn};

use crate::db::DbPool;
use crate::error::AppResult;
use crate::services::internal as svc;

const TOKEN_ENV: &str = "NOETL_INTERNAL_API_TOKEN";

// ===========================================================================
// Auth extractor
// ===========================================================================

/// Axum extractor that validates the bearer token before the handler
/// runs.  Returns `(StatusCode, String)` errors that axum maps to
/// HTTP responses; the handler never sees an unauthenticated request.
///
/// Failure modes (mirror the Python side):
///
/// - 503 `Service Unavailable` if `NOETL_INTERNAL_API_TOKEN` is unset
///   or empty in the server env.  Intentional — no permissive default
///   for a privileged surface.
/// - 403 `Forbidden` if the `Authorization` header is missing, malformed
///   (no `Bearer` scheme), or the token doesn't match.
#[derive(Debug)]
pub struct RequireInternalApiToken;

impl<S> FromRequestParts<S> for RequireInternalApiToken
where
    S: Send + Sync,
{
    type Rejection = (StatusCode, String);

    async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
        // 1. Server config — token env var must be set.
        let expected = match env::var(TOKEN_ENV) {
            Ok(value) if !value.trim().is_empty() => value,
            _ => {
                warn!(
                    "Internal API called but {} is not set; rejecting with 503.",
                    TOKEN_ENV
                );
                return Err((
                    StatusCode::SERVICE_UNAVAILABLE,
                    format!(
                        "Internal API not configured: {} env var unset on the server. \
                         Set it to the system worker pool's ServiceAccount token before \
                         calling /api/internal/* endpoints.",
                        TOKEN_ENV
                    ),
                ));
            }
        };

        // 2. Authorization header — must exist.
        let header = match parts.headers.get("authorization") {
            Some(v) => v,
            None => {
                return Err((
                    StatusCode::FORBIDDEN,
                    "Internal API requires Authorization header with Bearer token.".to_string(),
                ));
            }
        };
        let header_value = header.to_str().map_err(|_| {
            (
                StatusCode::FORBIDDEN,
                "Internal API Authorization header is not valid ASCII.".to_string(),
            )
        })?;

        // 3. Bearer scheme + non-empty token.
        let mut parts_iter = header_value.splitn(2, ' ');
        let scheme = parts_iter.next().unwrap_or("");
        let token = parts_iter.next().unwrap_or("").trim();
        if !scheme.eq_ignore_ascii_case("bearer") || token.is_empty() {
            return Err((
                StatusCode::FORBIDDEN,
                "Internal API requires 'Bearer <token>' Authorization scheme.".to_string(),
            ));
        }

        // 4. Constant-time comparison — never use == on secrets.
        let provided = token.as_bytes();
        let expected_bytes = expected.as_bytes();
        if provided.len() != expected_bytes.len()
            || !bool::from(provided.ct_eq(expected_bytes))
        {
            return Err((
                StatusCode::FORBIDDEN,
                "Invalid service-account token for /api/internal/*.".to_string(),
            ));
        }

        Ok(RequireInternalApiToken)
    }
}

// ===========================================================================
// Request/response shapes — byte-identical to the Python side
// ===========================================================================

#[derive(Debug, Deserialize, Default)]
pub struct OutboxClaimRequest {
    #[serde(default = "default_claim_limit")]
    pub limit: i64,
}

fn default_claim_limit() -> i64 {
    100
}

#[derive(Debug, Serialize)]
pub struct OutboxClaimResponse {
    pub rows: Vec<svc::OutboxRow>,
    pub claimed: i64,
}

#[derive(Debug, Deserialize)]
pub struct OutboxMarkPublishedRequest {
    pub outbox_ids: Vec<i64>,
}

#[derive(Debug, Serialize)]
pub struct OutboxMarkPublishedResponse {
    pub marked: i64,
}

#[derive(Debug, Deserialize)]
pub struct OutboxMarkFailedRequest {
    pub outbox_id: i64,
    pub error: String,
    #[serde(default = "default_attempts")]
    pub attempts: i32,
    #[serde(default = "default_max_delay_seconds")]
    pub max_delay_seconds: i32,
}

fn default_attempts() -> i32 {
    1
}

fn default_max_delay_seconds() -> i32 {
    300
}

#[derive(Debug, Serialize)]
pub struct OutboxMarkFailedResponse {
    pub marked: bool,
    pub available_at_in: i64,
}

#[derive(Debug, Serialize)]
pub struct OutboxPendingCountResponse {
    pub pending: i64,
}

#[derive(Debug, Deserialize)]
pub struct EventsProjectRequest {
    pub events: Vec<svc::EventEnvelope>,
}

#[derive(Debug, Serialize)]
pub struct EventsProjectResponse {
    pub projected: i64,
    pub duplicates: i64,
}

// ===========================================================================
// Route handlers
// ===========================================================================

/// `POST /api/internal/outbox/claim`
///
/// Claim a batch of PENDING/FAILED outbox rows and mark them IN_FLIGHT.
/// Replaces the direct-DB call from today's Python publisher.
#[tracing::instrument(skip(pool, _token), fields(limit = request.limit))]
pub async fn outbox_claim(
    State(pool): State<DbPool>,
    _token: RequireInternalApiToken,
    Json(request): Json<OutboxClaimRequest>,
) -> AppResult<Json<OutboxClaimResponse>> {
    let rows = svc::claim_batch(&pool, request.limit).await?;
    let claimed = rows.len() as i64;
    debug!(claimed, "outbox/claim done");
    Ok(Json(OutboxClaimResponse { rows, claimed }))
}

/// `POST /api/internal/outbox/mark-published`
///
/// Mark a batch of outbox rows PUBLISHED.  Idempotent.
#[tracing::instrument(skip(pool, _token), fields(count = request.outbox_ids.len()))]
pub async fn outbox_mark_published(
    State(pool): State<DbPool>,
    _token: RequireInternalApiToken,
    Json(request): Json<OutboxMarkPublishedRequest>,
) -> AppResult<Json<OutboxMarkPublishedResponse>> {
    if request.outbox_ids.is_empty() {
        return Err(crate::error::AppError::BadRequest(
            "outbox_ids must not be empty".to_string(),
        ));
    }
    let marked = svc::mark_published_batch(&pool, &request.outbox_ids).await?;
    debug!(marked, "outbox/mark-published done");
    Ok(Json(OutboxMarkPublishedResponse { marked }))
}

/// `POST /api/internal/outbox/mark-failed`
///
/// Mark a single outbox row FAILED with exponential backoff.
#[tracing::instrument(
    skip(pool, _token),
    fields(outbox_id = request.outbox_id, attempts = request.attempts)
)]
pub async fn outbox_mark_failed(
    State(pool): State<DbPool>,
    _token: RequireInternalApiToken,
    Json(request): Json<OutboxMarkFailedRequest>,
) -> AppResult<Json<OutboxMarkFailedResponse>> {
    if request.error.is_empty() {
        return Err(crate::error::AppError::BadRequest(
            "error must not be empty".to_string(),
        ));
    }
    let delay = svc::mark_failed_row(
        &pool,
        request.outbox_id,
        &request.error,
        request.attempts,
        request.max_delay_seconds,
    )
    .await?;
    info!(delay_seconds = delay, "outbox/mark-failed applied");
    Ok(Json(OutboxMarkFailedResponse {
        marked: true,
        available_at_in: delay,
    }))
}

/// `GET /api/internal/outbox/pending-count`
///
/// KEDA HTTP scaler trigger source.
#[tracing::instrument(skip(pool, _token))]
pub async fn outbox_pending_count(
    State(pool): State<DbPool>,
    _token: RequireInternalApiToken,
) -> AppResult<Json<OutboxPendingCountResponse>> {
    let pending = svc::pending_count(&pool).await?;
    Ok(Json(OutboxPendingCountResponse { pending }))
}

/// `POST /api/internal/events/project`
///
/// Batch-INSERT events into `noetl.event`.  Idempotent via
/// `ON CONFLICT (event_id) DO NOTHING`.
#[tracing::instrument(skip(pool, _token), fields(batch_size = request.events.len()))]
pub async fn events_project(
    State(pool): State<DbPool>,
    _token: RequireInternalApiToken,
    Json(request): Json<EventsProjectRequest>,
) -> AppResult<Json<EventsProjectResponse>> {
    if request.events.is_empty() {
        return Err(crate::error::AppError::BadRequest(
            "events must not be empty".to_string(),
        ));
    }
    let (projected, duplicates) = svc::project_events(&pool, &request.events).await?;
    info!(projected, duplicates, "events/project done");
    Ok(Json(EventsProjectResponse {
        projected,
        duplicates,
    }))
}

// ===========================================================================
// Tests — auth extractor (the only logic that doesn't need a real DB)
// ===========================================================================

#[cfg(test)]
mod tests {
    use super::*;
    use axum::body::Body;
    use axum::extract::FromRequestParts;
    use axum::http::{Request, StatusCode};

    /// Helper to invoke the extractor without a router around it.
    async fn run_extractor(
        env_token: Option<&str>,
        header: Option<&str>,
    ) -> Result<RequireInternalApiToken, (StatusCode, String)> {
        // Save + override the env var (tests run sequentially-isolated via
        // the `serial_test` pattern would be cleaner; here we accept the
        // small risk because each test sets + removes its own value).
        match env_token {
            Some(v) => unsafe { env::set_var(TOKEN_ENV, v) },
            None => unsafe { env::remove_var(TOKEN_ENV) },
        }

        let mut builder = Request::builder().method("GET").uri("/test");
        if let Some(h) = header {
            builder = builder.header("authorization", h);
        }
        let req = builder.body(Body::empty()).unwrap();
        let (mut parts, _body) = req.into_parts();

        let result = <RequireInternalApiToken as FromRequestParts<()>>::from_request_parts(
            &mut parts,
            &(),
        )
        .await;

        unsafe { env::remove_var(TOKEN_ENV) };
        result
    }

    // Tests touch the process-global ``NOETL_INTERNAL_API_TOKEN``
    // env var — they must run sequentially or one test's setenv races
    // another's getenv.  ``#[serial]`` from ``serial_test`` serialises
    // these without forcing the whole suite to single-threaded mode.

    #[tokio::test]
    #[serial_test::serial]
    async fn rejects_when_env_unset() {
        let err = run_extractor(None, Some("Bearer foo")).await.unwrap_err();
        assert_eq!(err.0, StatusCode::SERVICE_UNAVAILABLE);
        assert!(err.1.contains(TOKEN_ENV));
    }

    #[tokio::test]
    #[serial_test::serial]
    async fn rejects_when_env_blank() {
        let err = run_extractor(Some("   "), Some("Bearer foo"))
            .await
            .unwrap_err();
        assert_eq!(err.0, StatusCode::SERVICE_UNAVAILABLE);
    }

    #[tokio::test]
    #[serial_test::serial]
    async fn rejects_when_no_authorization_header() {
        let err = run_extractor(Some("secret-123"), None).await.unwrap_err();
        assert_eq!(err.0, StatusCode::FORBIDDEN);
        assert!(err.1.contains("Bearer"));
    }

    #[tokio::test]
    #[serial_test::serial]
    async fn rejects_non_bearer_scheme() {
        let err = run_extractor(Some("secret-123"), Some("Basic secret-123"))
            .await
            .unwrap_err();
        assert_eq!(err.0, StatusCode::FORBIDDEN);
    }

    #[tokio::test]
    #[serial_test::serial]
    async fn rejects_wrong_token() {
        let err = run_extractor(Some("secret-123"), Some("Bearer wrong"))
            .await
            .unwrap_err();
        assert_eq!(err.0, StatusCode::FORBIDDEN);
    }

    #[tokio::test]
    #[serial_test::serial]
    async fn accepts_valid_token() {
        let result = run_extractor(Some("secret-123"), Some("Bearer secret-123")).await;
        assert!(result.is_ok());
    }

    #[tokio::test]
    #[serial_test::serial]
    async fn accepts_valid_token_case_insensitive_scheme() {
        let result = run_extractor(Some("secret-123"), Some("bearer secret-123")).await;
        assert!(result.is_ok());
    }

    #[tokio::test]
    #[serial_test::serial]
    async fn rejects_empty_token_after_bearer() {
        let err = run_extractor(Some("secret-123"), Some("Bearer "))
            .await
            .unwrap_err();
        assert_eq!(err.0, StatusCode::FORBIDDEN);
    }
}