sttp-core-rs 0.1.5

Core STTP parsing, validation, storage contracts, and application services for Rust
Documentation
use chrono::{DateTime, Utc};
use sttp_core_rs::domain::contracts::NodeStore;
use sttp_core_rs::domain::models::{
    AvecState, ConnectorMetadata, NodeUpsertStatus, SttpNode, SyncCheckpoint,
    SyncCursor,
};
use sttp_core_rs::storage::InMemoryNodeStore;

fn build_test_node(session_id: &str) -> SttpNode {
    SttpNode {
        raw: "raw".to_string(),
        session_id: session_id.to_string(),
        tier: "raw".to_string(),
        timestamp: DateTime::parse_from_rfc3339("2026-03-05T06:30:00Z")
            .expect("timestamp should parse")
            .with_timezone(&Utc),
        compression_depth: 1,
        parent_node_id: None,
        sync_key: String::new(),
        updated_at: DateTime::parse_from_rfc3339("2026-03-05T06:30:00Z")
            .expect("timestamp should parse")
            .with_timezone(&Utc),
        source_metadata: None,
        user_avec: AvecState {
            stability: 0.85,
            friction: 0.25,
            logic: 0.80,
            autonomy: 0.70,
        },
        model_avec: AvecState {
            stability: 0.91,
            friction: 0.21,
            logic: 0.90,
            autonomy: 0.80,
        },
        compression_avec: Some(AvecState::zero()),
        rho: 0.96,
        kappa: 0.94,
        psi: 2.6,
    }
}

#[tokio::test(flavor = "current_thread")]
async fn duplicate_upsert_does_not_create_extra_rows() {
    let store = InMemoryNodeStore::new();

    let first = store
        .upsert_node_async(build_test_node("sync-session"))
        .await
        .expect("first upsert should succeed");
    let second = store
        .upsert_node_async(build_test_node("sync-session"))
        .await
        .expect("second upsert should succeed");

    assert_eq!(first.status, NodeUpsertStatus::Created);
    assert_eq!(second.status, NodeUpsertStatus::Duplicate);

    let nodes = store
        .query_nodes_async(sttp_core_rs::domain::models::NodeQuery {
            limit: 10,
            session_id: Some("sync-session".to_string()),
            from_utc: None,
            to_utc: None,
        })
        .await
        .expect("query should succeed");
    assert_eq!(nodes.len(), 1);
}

#[tokio::test(flavor = "current_thread")]
async fn change_query_returns_incremental_cursor() {
    let store = InMemoryNodeStore::new();

    let mut first = build_test_node("sync-session");
    first.updated_at = DateTime::parse_from_rfc3339("2026-03-05T06:31:00Z")
        .expect("timestamp should parse")
        .with_timezone(&Utc);
    first.sync_key = "sync-a".to_string();

    let mut second = build_test_node("sync-session");
    second.raw = "raw-b".to_string();
    second.timestamp = DateTime::parse_from_rfc3339("2026-03-05T06:32:00Z")
        .expect("timestamp should parse")
        .with_timezone(&Utc);
    second.updated_at = DateTime::parse_from_rfc3339("2026-03-05T06:33:00Z")
        .expect("timestamp should parse")
        .with_timezone(&Utc);
    second.sync_key = "sync-b".to_string();

    let first_result = store
        .upsert_node_async(first)
        .await
        .expect("first upsert should succeed");
    let second_result = store
        .upsert_node_async(second)
        .await
        .expect("second upsert should succeed");

    let result = store
        .query_changes_since_async(
            "sync-session",
            Some(SyncCursor {
                updated_at: first_result.updated_at,
                sync_key: first_result.sync_key,
            }),
            10,
        )
        .await
        .expect("change query should succeed");

    assert_eq!(result.nodes.len(), 1);
    assert_eq!(result.nodes[0].sync_key, second_result.sync_key);
}

#[tokio::test(flavor = "current_thread")]
async fn checkpoints_replace_existing_connector_state() {
    let store = InMemoryNodeStore::new();

    store
        .put_checkpoint_async(SyncCheckpoint {
            session_id: "sync-session".to_string(),
            connector_id: "cloud-primary".to_string(),
            cursor: Some(SyncCursor {
                updated_at: DateTime::parse_from_rfc3339("2026-03-05T06:35:00Z")
                    .expect("timestamp should parse")
                    .with_timezone(&Utc),
                sync_key: "sync-a".to_string(),
            }),
            updated_at: DateTime::parse_from_rfc3339("2026-03-05T06:36:00Z")
                .expect("timestamp should parse")
                .with_timezone(&Utc),
            metadata: Some(ConnectorMetadata {
                connector_id: "cloud-primary".to_string(),
                source_kind: "local".to_string(),
                upstream_id: "node-a".to_string(),
                revision: Some("1".to_string()),
                observed_at_utc: DateTime::parse_from_rfc3339("2026-03-05T06:36:00Z")
                    .expect("timestamp should parse")
                    .with_timezone(&Utc),
                extra: Some(serde_json::json!({ "endpoint": "local" })),
            }),
        })
        .await
        .expect("checkpoint insert should succeed");

    store
        .put_checkpoint_async(SyncCheckpoint {
            session_id: "sync-session".to_string(),
            connector_id: "cloud-primary".to_string(),
            cursor: Some(SyncCursor {
                updated_at: DateTime::parse_from_rfc3339("2026-03-05T06:40:00Z")
                    .expect("timestamp should parse")
                    .with_timezone(&Utc),
                sync_key: "sync-b".to_string(),
            }),
            updated_at: DateTime::parse_from_rfc3339("2026-03-05T06:41:00Z")
                .expect("timestamp should parse")
                .with_timezone(&Utc),
            metadata: Some(ConnectorMetadata {
                connector_id: "cloud-primary".to_string(),
                source_kind: "cloud".to_string(),
                upstream_id: "node-b".to_string(),
                revision: Some("2".to_string()),
                observed_at_utc: DateTime::parse_from_rfc3339("2026-03-05T06:41:00Z")
                    .expect("timestamp should parse")
                    .with_timezone(&Utc),
                extra: Some(serde_json::json!({ "endpoint": "cloud" })),
            }),
        })
        .await
        .expect("checkpoint update should succeed");

    let checkpoint = store
        .get_checkpoint_async("sync-session", "cloud-primary")
        .await
        .expect("checkpoint query should succeed")
        .expect("checkpoint should exist");

    assert_eq!(checkpoint.cursor.as_ref().map(|cursor| cursor.sync_key.as_str()), Some("sync-b"));
    assert_eq!(
        checkpoint.metadata.as_ref().map(|metadata| metadata.source_kind.as_str()),
        Some("cloud")
    );
    assert_eq!(
        checkpoint.metadata.as_ref().and_then(|metadata| metadata.revision.as_deref()),
        Some("2")
    );
}