solti-model 0.0.2

Solti SDK domain model.
Documentation
//! Output streaming types for live tail of task stdout/stderr.

use std::time::SystemTime;

use bytes::Bytes;
use serde::{Deserialize, Serialize};

/// StreamKind.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "lowercase")]
pub enum StreamKind {
    Stdout,
    Stderr,
}

/// One event in the live-tail stream of a task.
///
/// Carries either an output line, a run boundary marker, or a backpressure signal.
/// Wire format is JSON-tagged on `type`:
///
/// ```text
/// {"type":"chunk","attempt":1,"stream":"stdout","seq":0,"ts":1700,"line":"..."}
/// {"type":"runStarted","attempt":1,"startedAt":1700}
/// {"type":"runFinished","attempt":1,"exitCode":0,"finishedAt":1701}
/// {"type":"lagged","skipped":42}
/// ```
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(tag = "type", rename_all = "camelCase")]
pub enum OutputEvent {
    /// One line of stdout/stderr from the currently active run.
    Chunk(OutputChunk),

    /// A new run attempt has started; sequence numbers reset from this point on.
    #[serde(rename_all = "camelCase")]
    RunStarted {
        attempt: u32,
        #[serde(with = "crate::resource::metadata::time_serde")]
        started_at: SystemTime,
    },

    /// The current run finished. Consumers can stop accumulating chunks for this attempt.
    #[serde(rename_all = "camelCase")]
    RunFinished {
        attempt: u32,
        #[serde(skip_serializing_if = "Option::is_none")]
        exit_code: Option<i32>,
        #[serde(with = "crate::resource::metadata::time_serde")]
        finished_at: SystemTime,
    },

    /// Subscriber fell behind the broadcast ring window.
    Lagged { skipped: u64 },
}

/// One line of output from a single task-run attempt.
///
/// Carried through `tokio::sync::broadcast` channels in-process;
/// sent to clients via SSE / gRPC server-stream.
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "camelCase")]
pub struct OutputChunk {
    /// Which attempt of the task this chunk belongs to (matches [`TaskRun::attempt`]).
    ///
    /// [`TaskRun::attempt`]: crate::TaskRun::attempt
    pub attempt: u32,
    /// stdout or stderr.
    pub stream: StreamKind,
    /// Monotonic sequence number within this attempt; resets on next run.
    pub seq: u64,
    /// Wall-clock time the line was read by the agent (unix milliseconds on the wire).
    #[serde(with = "crate::resource::metadata::time_serde")]
    pub ts: SystemTime,
    /// One line, already truncated/cleaned by the runner.
    #[serde(with = "bytes_as_utf8_string")]
    pub line: Bytes,
}

/// Serde adapter: serialize `Bytes` as a UTF-8 string in JSON, deserialize from a JSON string back into `Bytes`.
mod bytes_as_utf8_string {
    use bytes::Bytes;
    use serde::{Deserialize, Deserializer, Serializer};

    pub(super) fn serialize<S>(b: &Bytes, s: S) -> Result<S::Ok, S::Error>
    where
        S: Serializer,
    {
        let txt = std::str::from_utf8(b).map_err(serde::ser::Error::custom)?;
        s.serialize_str(txt)
    }

    pub(super) fn deserialize<'de, D>(d: D) -> Result<Bytes, D::Error>
    where
        D: Deserializer<'de>,
    {
        let s = String::deserialize(d)?;
        Ok(Bytes::from(s))
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    use std::time::{Duration, UNIX_EPOCH};

    #[test]
    fn stream_kind_stdout_serializes_to_lowercase() {
        let json = serde_json::to_string(&StreamKind::Stdout).unwrap();
        assert_eq!(json, "\"stdout\"");
    }

    #[test]
    fn output_chunk_roundtrips_through_json() {
        let chunk = OutputChunk {
            attempt: 7,
            stream: StreamKind::Stderr,
            seq: 42,
            ts: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
            line: Bytes::from_static(b"compiling foo..."),
        };

        let json = serde_json::to_string(&chunk).unwrap();
        let back: OutputChunk = serde_json::from_str(&json).unwrap();

        assert_eq!(back, chunk);
    }

    #[test]
    fn output_chunk_serializes_ts_as_unix_milliseconds() {
        let chunk = OutputChunk {
            attempt: 1,
            stream: StreamKind::Stdout,
            seq: 0,
            ts: UNIX_EPOCH + Duration::from_millis(1234),
            line: Bytes::from_static(b"x"),
        };

        let json = serde_json::to_string(&chunk).unwrap();
        assert!(
            json.contains(r#""ts":1234"#),
            "ts must serialize as unix milliseconds; got {json}"
        );
    }

    #[test]
    fn output_chunk_serializes_line_as_utf8_string_not_array() {
        let chunk = OutputChunk {
            attempt: 1,
            stream: StreamKind::Stdout,
            seq: 0,
            ts: UNIX_EPOCH,
            line: Bytes::from_static(b"hello"),
        };
        let json = serde_json::to_string(&chunk).unwrap();
        assert!(
            json.contains(r#""line":"hello""#),
            "line must serialize as JSON string, not byte array; got {json}"
        );
    }

    #[test]
    fn output_event_chunk_inlines_chunk_fields() {
        let event = OutputEvent::Chunk(OutputChunk {
            attempt: 3,
            stream: StreamKind::Stdout,
            seq: 5,
            ts: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
            line: Bytes::from_static(b"hello"),
        });
        let json = serde_json::to_string(&event).unwrap();

        assert!(json.contains(r#""type":"chunk""#), "missing tag in {json}");
        assert!(json.contains(r#""attempt":3"#), "{json}");
        assert!(json.contains(r#""stream":"stdout""#), "{json}");
        assert!(json.contains(r#""line":"hello""#), "{json}");
    }

    #[test]
    fn output_event_run_started_carries_attempt_and_ts() {
        let event = OutputEvent::RunStarted {
            attempt: 2,
            started_at: UNIX_EPOCH + Duration::from_millis(1234),
        };
        let json = serde_json::to_string(&event).unwrap();

        assert!(json.contains(r#""type":"runStarted""#), "{json}");
        assert!(json.contains(r#""attempt":2"#), "{json}");
        assert!(json.contains(r#""startedAt":1234"#), "{json}");
    }

    #[test]
    fn output_event_run_finished_carries_exit_code() {
        let event = OutputEvent::RunFinished {
            attempt: 2,
            exit_code: Some(0),
            finished_at: UNIX_EPOCH + Duration::from_millis(2222),
        };
        let json = serde_json::to_string(&event).unwrap();

        assert!(json.contains(r#""type":"runFinished""#), "{json}");
        assert!(json.contains(r#""exitCode":0"#), "{json}");
        assert!(json.contains(r#""finishedAt":2222"#), "{json}");
    }

    #[test]
    fn output_event_lagged_carries_skipped_count() {
        let event = OutputEvent::Lagged { skipped: 1500 };
        let json = serde_json::to_string(&event).unwrap();

        assert!(json.contains(r#""type":"lagged""#), "{json}");
        assert!(json.contains(r#""skipped":1500"#), "{json}");
    }

    #[test]
    fn output_event_roundtrips_through_json() {
        let cases = [
            OutputEvent::Chunk(OutputChunk {
                attempt: 1,
                stream: StreamKind::Stderr,
                seq: 0,
                ts: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
                line: Bytes::from_static(b"warning"),
            }),
            OutputEvent::RunStarted {
                attempt: 1,
                started_at: UNIX_EPOCH + Duration::from_millis(1_700_000_000_000),
            },
            OutputEvent::RunFinished {
                attempt: 1,
                exit_code: Some(42),
                finished_at: UNIX_EPOCH + Duration::from_millis(1_700_000_001_000),
            },
            OutputEvent::Lagged { skipped: 7 },
        ];

        for original in cases {
            let json = serde_json::to_string(&original).unwrap();
            let back: OutputEvent = serde_json::from_str(&json).unwrap();
            assert_eq!(back, original, "roundtrip failed for {json}");
        }
    }

    #[test]
    fn output_chunk_uses_camel_case_keys() {
        let chunk = OutputChunk {
            attempt: 2,
            stream: StreamKind::Stdout,
            seq: 9,
            ts: UNIX_EPOCH,
            line: Bytes::from_static(b"hi"),
        };

        let json = serde_json::to_string(&chunk).unwrap();
        for key in [
            r#""attempt":"#,
            r#""stream":"#,
            r#""seq":"#,
            r#""ts":"#,
            r#""line":"#,
        ] {
            assert!(json.contains(key), "missing key {key} in {json}");
        }
    }

    #[test]
    fn output_chunk_clone_is_refcount_bump() {
        let original = OutputChunk {
            attempt: 1,
            stream: StreamKind::Stdout,
            seq: 0,
            ts: UNIX_EPOCH,
            line: Bytes::from_static(b"shared-line"),
        };
        let cloned = original.clone();
        assert_eq!(original.line.as_ptr(), cloned.line.as_ptr());
    }
}