mabi-runtime 1.4.0

Mabinogion shared runtime contracts and service orchestration
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
use std::collections::BTreeMap;
use std::sync::Arc;

use async_trait::async_trait;
use chrono::{DateTime, Utc};
use parking_lot::Mutex;
use serde::{Deserialize, Serialize};
use serde_json::Value as JsonValue;
use thiserror::Error;
use tokio::task::{AbortHandle, JoinError, JoinHandle};
use tokio::time::{timeout, Duration};
use tokio_util::sync::CancellationToken;

use mabi_core::Protocol;

use crate::device::DeviceRegistry;

/// Runtime-level result type.
pub type RuntimeResult<T> = Result<T, RuntimeError>;

/// Runtime-level errors.
#[derive(Debug, Error)]
pub enum RuntimeError {
    #[error("service error: {message}")]
    Service { message: String },

    #[error("service task failed: {message}")]
    TaskJoin { message: String },

    #[error("service readiness timed out after {seconds}s")]
    ReadinessTimeout { seconds: u64 },
}

impl RuntimeError {
    /// Convenience constructor for message-based errors.
    pub fn service(message: impl Into<String>) -> Self {
        Self::Service {
            message: message.into(),
        }
    }
}

impl From<JoinError> for RuntimeError {
    fn from(error: JoinError) -> Self {
        Self::TaskJoin {
            message: error.to_string(),
        }
    }
}

/// Shared service lifecycle states.
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq, Default)]
#[serde(rename_all = "snake_case")]
pub enum ServiceState {
    #[default]
    Idle,
    Starting,
    Running,
    Stopping,
    Stopped,
    Error,
}

/// Current service status snapshot.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceStatus {
    pub name: String,
    pub protocol: Option<Protocol>,
    pub state: ServiceState,
    pub ready: bool,
    pub started_at: Option<DateTime<Utc>>,
    pub last_error: Option<String>,
}

impl ServiceStatus {
    /// Creates a fresh idle status.
    pub fn new(name: impl Into<String>) -> Self {
        Self {
            name: name.into(),
            protocol: None,
            state: ServiceState::Idle,
            ready: false,
            started_at: None,
            last_error: None,
        }
    }

    /// Returns true when the service is terminal.
    pub fn is_terminal(&self) -> bool {
        matches!(self.state, ServiceState::Stopped | ServiceState::Error)
    }
}

/// Structured snapshot used by the CLI and tests.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ServiceSnapshot {
    pub name: String,
    pub protocol: Option<Protocol>,
    pub status: ServiceStatus,
    #[serde(default)]
    pub metadata: BTreeMap<String, JsonValue>,
}

impl ServiceSnapshot {
    /// Creates an empty snapshot.
    pub fn new(name: impl Into<String>) -> Self {
        let name = name.into();
        Self {
            status: ServiceStatus::new(name.clone()),
            name,
            protocol: None,
            metadata: BTreeMap::new(),
        }
    }

    /// Adds metadata to the snapshot.
    pub fn with_metadata(mut self, key: impl Into<String>, value: JsonValue) -> Self {
        self.metadata.insert(key.into(), value);
        self
    }
}

/// Events emitted by the shared runtime context.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "snake_case")]
pub enum ServiceEvent {
    StateChanged { state: ServiceState },
    Cancelled,
    Message { message: String },
}

#[derive(Debug, Clone)]
struct TrackedTask {
    label: String,
    abort: AbortHandle,
}

#[derive(Debug)]
struct ServiceContextInner {
    name: String,
    protocol: Option<Protocol>,
    started_at: DateTime<Utc>,
    cancellation: CancellationToken,
    event_tx: tokio::sync::broadcast::Sender<ServiceEvent>,
    tracked_tasks: Mutex<Vec<TrackedTask>>,
}

/// Shared runtime context provided to all managed services.
#[derive(Clone, Debug)]
pub struct ServiceContext {
    inner: Arc<ServiceContextInner>,
}

impl ServiceContext {
    /// Creates a new service context.
    pub fn new(name: impl Into<String>, protocol: Option<Protocol>) -> Self {
        let (event_tx, _) = tokio::sync::broadcast::channel(64);
        Self {
            inner: Arc::new(ServiceContextInner {
                name: name.into(),
                protocol,
                started_at: Utc::now(),
                cancellation: CancellationToken::new(),
                event_tx,
                tracked_tasks: Mutex::new(Vec::new()),
            }),
        }
    }

    /// Returns the service name.
    pub fn name(&self) -> &str {
        &self.inner.name
    }

    /// Returns the service protocol, if one exists.
    pub fn protocol(&self) -> Option<Protocol> {
        self.inner.protocol
    }

    /// Returns when the context was created.
    pub fn started_at(&self) -> DateTime<Utc> {
        self.inner.started_at
    }

    /// Returns the shared cancellation token.
    pub fn cancellation_token(&self) -> CancellationToken {
        self.inner.cancellation.clone()
    }

    /// Returns a child token for scoped tasks.
    pub fn child_token(&self) -> CancellationToken {
        self.inner.cancellation.child_token()
    }

    /// Cancels the context and all child scopes.
    pub fn cancel(&self) {
        self.inner.cancellation.cancel();
        let _ = self.emit(ServiceEvent::Cancelled);
    }

    /// Returns whether cancellation has been requested.
    pub fn is_cancelled(&self) -> bool {
        self.inner.cancellation.is_cancelled()
    }

    /// Subscribes to service events.
    pub fn subscribe(&self) -> tokio::sync::broadcast::Receiver<ServiceEvent> {
        self.inner.event_tx.subscribe()
    }

    /// Emits a service event.
    pub fn emit(
        &self,
        event: ServiceEvent,
    ) -> Result<usize, tokio::sync::broadcast::error::SendError<ServiceEvent>> {
        self.inner.event_tx.send(event)
    }

    /// Tracks an externally-spawned task under the context.
    pub fn track_task(&self, label: impl Into<String>, handle: &JoinHandle<()>) {
        self.inner.tracked_tasks.lock().push(TrackedTask {
            label: label.into(),
            abort: handle.abort_handle(),
        });
    }

    /// Spawns and tracks a unit-returning background task.
    pub fn spawn_task<F>(&self, label: impl Into<String>, future: F) -> JoinHandle<()>
    where
        F: std::future::Future<Output = ()> + Send + 'static,
    {
        let label = label.into();
        let handle = tokio::spawn(future);
        self.inner.tracked_tasks.lock().push(TrackedTask {
            label,
            abort: handle.abort_handle(),
        });
        handle
    }

    /// Returns the tracked task labels.
    pub fn tracked_tasks(&self) -> Vec<String> {
        self.inner
            .tracked_tasks
            .lock()
            .iter()
            .map(|task| task.label.clone())
            .collect()
    }

    /// Aborts all tracked tasks.
    pub fn abort_tracked_tasks(&self) {
        for task in self.inner.tracked_tasks.lock().iter() {
            task.abort.abort();
        }
    }
}

/// Shared lifecycle contract for protocol services.
#[async_trait]
pub trait ManagedService: Send + Sync {
    /// Performs any non-blocking startup work.
    async fn start(&self, context: &ServiceContext) -> RuntimeResult<()>;

    /// Requests a graceful stop.
    async fn stop(&self, context: &ServiceContext) -> RuntimeResult<()>;

    /// Runs the service until completion or cancellation.
    async fn serve(&self, context: ServiceContext) -> RuntimeResult<()>;

    /// Returns the current status.
    fn status(&self) -> ServiceStatus;

    /// Returns a structured snapshot.
    async fn snapshot(&self) -> RuntimeResult<ServiceSnapshot>;

    /// Publishes any controller-visible device ports exposed by this service.
    fn register_devices(&self, _registry: &DeviceRegistry) -> RuntimeResult<()> {
        Ok(())
    }
}

/// Shared handle for spawning, stopping, and inspecting managed services.
pub struct ServiceHandle {
    service: Arc<dyn ManagedService>,
    context: ServiceContext,
    task: Arc<tokio::sync::Mutex<Option<JoinHandle<RuntimeResult<()>>>>>,
}

impl ServiceHandle {
    /// Creates a new handle around a service and context.
    pub fn new(service: Arc<dyn ManagedService>, context: ServiceContext) -> Self {
        Self {
            service,
            context,
            task: Arc::new(tokio::sync::Mutex::new(None)),
        }
    }

    /// Creates a handle for a named service.
    pub fn named(
        name: impl Into<String>,
        protocol: Option<Protocol>,
        service: Arc<dyn ManagedService>,
    ) -> Self {
        Self::new(service, ServiceContext::new(name, protocol))
    }

    /// Returns the shared service context.
    pub fn context(&self) -> ServiceContext {
        self.context.clone()
    }

    /// Spawns the service task if it is not already running.
    pub async fn spawn(&self) -> RuntimeResult<()> {
        let mut guard = self.task.lock().await;
        if guard.is_some() {
            return Ok(());
        }

        self.service.start(&self.context).await?;

        let service = self.service.clone();
        let context = self.context.clone();
        *guard = Some(tokio::spawn(async move { service.serve(context).await }));
        Ok(())
    }

    /// Requests service shutdown and waits for the service task.
    pub async fn stop(&self) -> RuntimeResult<()> {
        self.context.cancel();
        self.service.stop(&self.context).await?;
        self.context.abort_tracked_tasks();

        if let Some(handle) = self.task.lock().await.take() {
            handle.await??;
        }

        Ok(())
    }

    /// Waits for the service task to finish if it was spawned.
    pub async fn wait(&self) -> RuntimeResult<()> {
        if let Some(handle) = self.task.lock().await.take() {
            handle.await??;
        }
        Ok(())
    }

    /// Waits until the service reports readiness or the timeout elapses.
    pub async fn readiness(&self, max_wait: Duration) -> RuntimeResult<ServiceStatus> {
        let service = self.service.clone();
        timeout(max_wait, async move {
            loop {
                let status = service.status();
                if status.ready || status.is_terminal() {
                    return status;
                }
                tokio::time::sleep(Duration::from_millis(25)).await;
            }
        })
        .await
        .map_err(|_| RuntimeError::ReadinessTimeout {
            seconds: max_wait.as_secs(),
        })
    }

    /// Returns the latest status.
    pub fn status(&self) -> ServiceStatus {
        self.service.status()
    }

    /// Returns the latest snapshot.
    pub async fn snapshot(&self) -> RuntimeResult<ServiceSnapshot> {
        self.service.snapshot().await
    }
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use async_trait::async_trait;
    use tokio::time::Duration;

    use crate::service::{
        ManagedService, RuntimeResult, ServiceContext, ServiceHandle, ServiceSnapshot,
        ServiceState, ServiceStatus,
    };

    struct TestService {
        status: parking_lot::RwLock<ServiceStatus>,
    }

    impl TestService {
        fn new() -> Self {
            Self {
                status: parking_lot::RwLock::new(ServiceStatus::new("test")),
            }
        }
    }

    #[async_trait]
    impl ManagedService for TestService {
        async fn start(&self, context: &ServiceContext) -> RuntimeResult<()> {
            let mut status = self.status.write();
            status.state = ServiceState::Starting;
            status.started_at = Some(context.started_at());
            Ok(())
        }

        async fn stop(&self, _context: &ServiceContext) -> RuntimeResult<()> {
            let mut status = self.status.write();
            status.state = ServiceState::Stopped;
            status.ready = false;
            Ok(())
        }

        async fn serve(&self, context: ServiceContext) -> RuntimeResult<()> {
            {
                let mut status = self.status.write();
                status.state = ServiceState::Running;
                status.ready = true;
            }
            context.cancellation_token().cancelled().await;
            let mut status = self.status.write();
            status.state = ServiceState::Stopped;
            status.ready = false;
            Ok(())
        }

        fn status(&self) -> ServiceStatus {
            self.status.read().clone()
        }

        async fn snapshot(&self) -> RuntimeResult<ServiceSnapshot> {
            let mut snapshot = ServiceSnapshot::new("test");
            snapshot.status = self.status();
            Ok(snapshot)
        }
    }

    #[tokio::test]
    async fn handle_spawns_and_stops_service() {
        let service = Arc::new(TestService::new());
        let handle = ServiceHandle::named("test", None, service);
        handle.spawn().await.unwrap();
        let status = handle.readiness(Duration::from_secs(1)).await.unwrap();
        assert!(status.ready);
        handle.stop().await.unwrap();
        assert_eq!(handle.status().state, ServiceState::Stopped);
    }
}