aion-server 0.2.0

Deployable HTTP, gRPC, WebSocket, and worker endpoint for Aion workflows.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
//! Prometheus metrics registry and recording helpers.

use std::sync::Arc;
use std::time::{Duration, Instant};

use aion_core::{Event, TimerId, WorkflowFilter, WorkflowId, WorkflowSummary};
use aion_store::{
    EventStore, ReadableEventStore, RunSummary, StoreError, TimerEntry, WritableEventStore,
    WriteToken,
};
use async_trait::async_trait;
use axum::http::header::CONTENT_TYPE;
use axum::http::{HeaderValue, StatusCode};
use axum::response::{IntoResponse, Response};
use chrono::{DateTime, Utc};
use prometheus::{
    Encoder, HistogramOpts, HistogramVec, IntCounterVec, IntGaugeVec, Opts, Registry, TextEncoder,
};
use thiserror::Error;

const TEXT_FORMAT: &str = "text/plain; version=0.0.4; charset=utf-8";

/// Prometheus registry construction or exposition error.
#[derive(Debug, Error)]
pub enum MetricsError {
    /// A metric failed to register in the prometheus registry.
    #[error("failed to register prometheus metric: {0}")]
    Register(#[from] prometheus::Error),
    /// The prometheus text encoder failed to encode gathered metric families.
    #[error("failed to encode prometheus metrics: {0}")]
    Encode(String),
}

/// Cloneable server metrics handle backed by a prometheus registry.
#[derive(Clone, Debug)]
pub struct Metrics {
    inner: Arc<MetricsInner>,
}

#[derive(Debug)]
struct MetricsInner {
    registry: Registry,
    workflows_started: IntCounterVec,
    workflows_completed: IntCounterVec,
    activities_dispatched: IntCounterVec,
    activities_completed: IntCounterVec,
    activity_duration: HistogramVec,
    store_operation_duration: HistogramVec,
    connected_workers: IntGaugeVec,
    inflight_activities: IntGaugeVec,
    signals_delivered: IntCounterVec,
    schedules_fired: IntCounterVec,
}

impl MetricsInner {
    fn register_collectors(&self) -> Result<(), prometheus::Error> {
        self.registry
            .register(Box::new(self.workflows_started.clone()))?;
        self.registry
            .register(Box::new(self.workflows_completed.clone()))?;
        self.registry
            .register(Box::new(self.activities_dispatched.clone()))?;
        self.registry
            .register(Box::new(self.activities_completed.clone()))?;
        self.registry
            .register(Box::new(self.activity_duration.clone()))?;
        self.registry
            .register(Box::new(self.store_operation_duration.clone()))?;
        self.registry
            .register(Box::new(self.connected_workers.clone()))?;
        self.registry
            .register(Box::new(self.inflight_activities.clone()))?;
        self.registry
            .register(Box::new(self.signals_delivered.clone()))?;
        self.registry
            .register(Box::new(self.schedules_fired.clone()))?;
        Ok(())
    }
}

impl Metrics {
    /// Construct the server metrics registry and register all exported metrics.
    ///
    /// # Errors
    ///
    /// Returns [`MetricsError::Register`] if prometheus rejects a metric descriptor.
    pub fn new() -> Result<Self, MetricsError> {
        let inner = build_metrics_inner()?;
        inner.register_collectors()?;
        initialize_default_label_sets(&inner);
        Ok(Self {
            inner: Arc::new(inner),
        })
    }

    /// Encode all currently gathered metrics in prometheus text exposition format.
    ///
    /// # Errors
    ///
    /// Returns [`MetricsError::Encode`] if prometheus cannot encode gathered metrics.
    pub fn encode(&self) -> Result<Vec<u8>, MetricsError> {
        let encoder = TextEncoder::new();
        let families = self.inner.registry.gather();
        let mut buffer = Vec::new();
        encoder
            .encode(&families, &mut buffer)
            .map_err(|error| MetricsError::Encode(error.to_string()))?;
        Ok(buffer)
    }

    /// Increment the workflow-start counter.
    pub fn workflow_started(&self, namespace: &str, workflow_type: &str) {
        self.inner
            .workflows_started
            .with_label_values(&[namespace, workflow_type])
            .inc();
    }

    /// Increment the workflow-terminal counter.
    pub fn workflow_completed(&self, namespace: &str, status: &str) {
        self.inner
            .workflows_completed
            .with_label_values(&[namespace, status])
            .inc();
    }

    /// Increment the activity-dispatch counter and in-flight gauge.
    pub fn activity_dispatched(&self, namespace: &str, activity_type: &str) {
        self.inner
            .activities_dispatched
            .with_label_values(&[namespace, activity_type])
            .inc();
        self.inner
            .inflight_activities
            .with_label_values(&[namespace])
            .inc();
    }

    /// Increment the activity-completion counter, observe duration, and decrement in-flight gauge.
    pub fn activity_completed(
        &self,
        namespace: &str,
        activity_type: &str,
        outcome: &str,
        duration: Duration,
    ) {
        self.inner
            .activities_completed
            .with_label_values(&[namespace, outcome])
            .inc();
        self.inner
            .activity_duration
            .with_label_values(&[namespace, activity_type])
            .observe(duration.as_secs_f64());
        self.inner
            .inflight_activities
            .with_label_values(&[namespace])
            .dec();
    }

    /// Decrement in-flight activity gauge when dispatch fails before a result can arrive.
    pub fn activity_abandoned(&self, namespace: &str) {
        self.inner
            .inflight_activities
            .with_label_values(&[namespace])
            .dec();
    }

    /// Observe a store operation duration.
    pub fn store_operation(&self, operation: &str, duration: Duration) {
        self.inner
            .store_operation_duration
            .with_label_values(&[operation])
            .observe(duration.as_secs_f64());
    }

    /// Increment connected worker gauge for a namespace.
    pub fn worker_connected(&self, namespace: &str) {
        self.inner
            .connected_workers
            .with_label_values(&[namespace])
            .inc();
    }

    /// Decrement connected worker gauge for a namespace.
    pub fn worker_disconnected(&self, namespace: &str) {
        self.inner
            .connected_workers
            .with_label_values(&[namespace])
            .dec();
    }

    /// Increment signal delivery counter.
    pub fn signal_delivered(&self, namespace: &str, residency: &str) {
        self.inner
            .signals_delivered
            .with_label_values(&[namespace, residency])
            .inc();
    }

    /// Increment schedule-fired counter.
    pub fn schedule_fired(&self, namespace: &str) {
        self.inner
            .schedules_fired
            .with_label_values(&[namespace])
            .inc();
    }
}

fn build_metrics_inner() -> Result<MetricsInner, MetricsError> {
    let registry = Registry::new();
    let workflows_started = IntCounterVec::new(
        Opts::new(
            "aion_workflows_started_total",
            "Total workflow executions started by namespace and workflow type.",
        ),
        &["namespace", "workflow_type"],
    )?;
    let workflows_completed = IntCounterVec::new(
        Opts::new(
            "aion_workflows_completed_total",
            "Total workflow executions that reached a terminal status by namespace and status.",
        ),
        &["namespace", "status"],
    )?;
    let activities_dispatched = IntCounterVec::new(
        Opts::new(
            "aion_activities_dispatched_total",
            "Total activities dispatched to workers by namespace and activity type.",
        ),
        &["namespace", "activity_type"],
    )?;
    let activities_completed = IntCounterVec::new(
        Opts::new(
            "aion_activities_completed_total",
            "Total activity results received by namespace and outcome.",
        ),
        &["namespace", "outcome"],
    )?;
    let activity_duration = HistogramVec::new(
        HistogramOpts::new(
            "aion_activity_duration_seconds",
            "Wall-clock activity execution latency from dispatch to result by namespace and activity type.",
        )
        .buckets(vec![
            0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0, 30.0,
        ]),
        &["namespace", "activity_type"],
    )?;
    let store_operation_duration = HistogramVec::new(
        HistogramOpts::new(
            "aion_store_operation_duration_seconds",
            "Store operation latency by operation.",
        )
        .buckets(vec![
            0.001, 0.0025, 0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0,
        ]),
        &["operation"],
    )?;
    let connected_workers = IntGaugeVec::new(
        Opts::new(
            "aion_connected_workers",
            "Current connected worker streams by namespace.",
        ),
        &["namespace"],
    )?;
    let inflight_activities = IntGaugeVec::new(
        Opts::new(
            "aion_inflight_activities",
            "Current dispatched activities awaiting worker completion by namespace.",
        ),
        &["namespace"],
    )?;
    let signals_delivered = IntCounterVec::new(
        Opts::new(
            "aion_signals_delivered_total",
            "Total signals delivered by namespace and residency classification.",
        ),
        &["namespace", "residency"],
    )?;
    let schedules_fired = IntCounterVec::new(
        Opts::new(
            "aion_schedules_fired_total",
            "Total schedule timer evaluations that started a workflow by namespace.",
        ),
        &["namespace"],
    )?;

    Ok(MetricsInner {
        registry,
        workflows_started,
        workflows_completed,
        activities_dispatched,
        activities_completed,
        activity_duration,
        store_operation_duration,
        connected_workers,
        inflight_activities,
        signals_delivered,
        schedules_fired,
    })
}

/// Pre-initialize known label sets so all metric families appear in the
/// prometheus text output before any workflow or activity traffic occurs.
fn initialize_default_label_sets(inner: &MetricsInner) {
    for operation in ["append", "read_history", "list_active", "list_workflow_ids"] {
        inner
            .store_operation_duration
            .with_label_values(&[operation]);
    }
    inner
        .activity_duration
        .with_label_values(&["default", "default"]);
}

/// Axum handler for `/metrics`.
pub async fn metrics_handler(
    axum::extract::State(metrics): axum::extract::State<Metrics>,
) -> Response {
    match metrics.encode() {
        Ok(body) => {
            let mut response = body.into_response();
            response
                .headers_mut()
                .insert(CONTENT_TYPE, HeaderValue::from_static(TEXT_FORMAT));
            response
        }
        Err(error) => (StatusCode::INTERNAL_SERVER_ERROR, error.to_string()).into_response(),
    }
}

/// Event-store wrapper that observes operation latency and lifecycle events without changing engine crates.
pub struct InstrumentedEventStore {
    inner: Arc<dyn EventStore>,
    metrics: Metrics,
    namespace: String,
}

impl InstrumentedEventStore {
    /// Wrap an event store with server-side metrics.
    #[must_use]
    pub fn new(inner: Arc<dyn EventStore>, metrics: Metrics, namespace: impl Into<String>) -> Self {
        Self {
            inner,
            metrics,
            namespace: namespace.into(),
        }
    }

    fn record_events(&self, events: &[Event]) {
        for event in events {
            match event {
                Event::WorkflowStarted { workflow_type, .. } => {
                    self.metrics
                        .workflow_started(&self.namespace, workflow_type.as_str());
                }
                Event::WorkflowCompleted { .. } => {
                    self.metrics
                        .workflow_completed(&self.namespace, "completed");
                }
                Event::WorkflowFailed { .. } => {
                    self.metrics.workflow_completed(&self.namespace, "failed");
                }
                Event::WorkflowCancelled { .. } => {
                    self.metrics
                        .workflow_completed(&self.namespace, "cancelled");
                }
                Event::WorkflowTimedOut { .. } => {
                    self.metrics
                        .workflow_completed(&self.namespace, "timed_out");
                }
                Event::WorkflowContinuedAsNew { .. } => {
                    self.metrics
                        .workflow_completed(&self.namespace, "continued_as_new");
                }
                Event::SignalReceived { .. } => {
                    self.metrics.signal_delivered(&self.namespace, "resident");
                }
                Event::ScheduleTriggered { .. } => {
                    self.metrics.schedule_fired(&self.namespace);
                }
                _ => {}
            }
        }
    }

    fn observe_since(&self, operation: &str, started: Instant) {
        self.metrics.store_operation(operation, started.elapsed());
    }
}

impl std::fmt::Debug for InstrumentedEventStore {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("InstrumentedEventStore")
            .field("namespace", &self.namespace)
            .finish_non_exhaustive()
    }
}

#[async_trait]
impl WritableEventStore for InstrumentedEventStore {
    async fn append(
        &self,
        token: WriteToken,
        workflow_id: &WorkflowId,
        events: &[Event],
        expected_seq: u64,
    ) -> Result<(), StoreError> {
        let started = Instant::now();
        let result = self
            .inner
            .append(token, workflow_id, events, expected_seq)
            .await;
        self.observe_since("append", started);
        if result.is_ok() {
            self.record_events(events);
        }
        result
    }
}

#[async_trait]
impl ReadableEventStore for InstrumentedEventStore {
    async fn read_history(&self, workflow_id: &WorkflowId) -> Result<Vec<Event>, StoreError> {
        let started = Instant::now();
        let result = self.inner.read_history(workflow_id).await;
        self.observe_since("read_history", started);
        result
    }

    async fn read_history_from(
        &self,
        workflow_id: &WorkflowId,
        from_seq: u64,
    ) -> Result<Vec<Event>, StoreError> {
        let started = Instant::now();
        let result = self.inner.read_history_from(workflow_id, from_seq).await;
        self.observe_since("read_history_from", started);
        result
    }

    async fn read_run_chain(
        &self,
        workflow_id: &WorkflowId,
    ) -> Result<Vec<RunSummary>, StoreError> {
        self.inner.read_run_chain(workflow_id).await
    }

    async fn list_workflow_ids(&self) -> Result<Vec<WorkflowId>, StoreError> {
        let started = Instant::now();
        let result = self.inner.list_workflow_ids().await;
        self.observe_since("list_workflow_ids", started);
        result
    }

    async fn list_active(&self) -> Result<Vec<WorkflowId>, StoreError> {
        let started = Instant::now();
        let result = self.inner.list_active().await;
        self.observe_since("list_active", started);
        result
    }

    async fn query(&self, filter: &WorkflowFilter) -> Result<Vec<WorkflowSummary>, StoreError> {
        self.inner.query(filter).await
    }

    async fn schedule_timer(
        &self,
        workflow_id: &WorkflowId,
        timer_id: &TimerId,
        fire_at: DateTime<Utc>,
    ) -> Result<(), StoreError> {
        self.inner
            .schedule_timer(workflow_id, timer_id, fire_at)
            .await
    }

    async fn expired_timers(&self, as_of: DateTime<Utc>) -> Result<Vec<TimerEntry>, StoreError> {
        self.inner.expired_timers(as_of).await
    }
}