xynthe 0.1.0

A unified orchestration framework for autonomous intelligence with temporal continuity
Documentation
//! Thought Streams - Structured reasoning pathways
//!
//! Thought streams provide typed channels carrying structured cognitive events.
//! They enable agents to maintain multiple reasoning threads simultaneously,
//! supporting parallel hypothesis tracking and non-linear reasoning.

use crate::types::{ProvenanceChain, StructuredContent, Timestamp};
use crate::Result;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;

/// Type of thought event
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ThoughtEventType {
    /// Observation from perception
    Observation,
    /// Hypothesis or assumption
    Hypothesis,
    /// Intention or goal
    Intention,
    /// Reflection on past events
    Reflection,
    /// Tool invocation
    Action,
    /// Warning or error
    Warning,
    /// Confirmation of success
    Success,
}

/// Represents a single cognitive event in a thought stream
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ThoughtEvent {
    /// Unique identifier
    pub id: Uuid,

    /// Event creation timestamp
    pub timestamp: Timestamp,

    /// Type of thought event
    #[serde(rename = "type")]
    pub event_type: ThoughtEventType,

    /// Event content
    pub content: StructuredContent,

    /// Provenance chain tracking origin
    pub provenance: ProvenanceChain,

    /// Confidence score between 0.0 and 1.0
    pub confidence: f64,
}

impl ThoughtEvent {
    /// Create a new thought event
    pub fn new(
        event_type: ThoughtEventType,
        content: StructuredContent,
        provenance: ProvenanceChain,
        confidence: f64,
    ) -> Self {
        Self {
            id: Uuid::new_v4(),
            timestamp: Timestamp::now(),
            event_type,
            content,
            provenance,
            confidence: confidence.clamp(0.0, 1.0),
        }
    }

    /// Create an observation event
    pub fn observation(content: StructuredContent, provenance: ProvenanceChain) -> Self {
        Self::new(ThoughtEventType::Observation, content, provenance, 1.0)
    }

    /// Create a hypothesis event
    pub fn hypothesis(content: StructuredContent, provenance: ProvenanceChain, confidence: f64) -> Self {
        Self::new(ThoughtEventType::Hypothesis, content, provenance, confidence)
    }

    /// Create an intention event
    pub fn intention(content: StructuredContent, provenance: ProvenanceChain) -> Self {
        Self::new(ThoughtEventType::Intention, content, provenance, 1.0)
    }

    /// Create a reflection event
    pub fn reflection(content: StructuredContent, provenance: ProvenanceChain) -> Self {
        Self::new(ThoughtEventType::Reflection, content, provenance, 1.0)
    }

    /// Check if this event is a hypothesis
    pub fn is_hypothesis(&self) -> bool {
        matches!(self.event_type, ThoughtEventType::Hypothesis)
    }

    /// Check if this event has high confidence
    pub fn is_high_confidence(&self) -> bool {
        self.confidence > 0.8
    }

    /// Check if this event has low confidence
    pub fn is_low_confidence(&self) -> bool {
        self.confidence < 0.5
    }
}

/// Stream operator trait for composing thought streams
#[async_trait]
pub trait StreamOperator: Send + Sync {
    /// Apply the operator to a stream of events
    async fn apply(&self, events: Vec<ThoughtEvent>) -> Result<Vec<ThoughtEvent>>;
}

/// Thought stream for managing cognitive events
#[derive(Clone)]
pub struct ThoughtStream {
    /// Stream name identifier
    name: String,

    /// Channel sender
    sender: Arc<mpsc::UnboundedSender<ThoughtEvent>>,

    /// Channel receiver (wrapped in Arc<RwLock> for shared access)
    receiver: Arc<RwLock<mpsc::UnboundedReceiver<ThoughtEvent>>>,
}

impl ThoughtStream {
    /// Create a new thought stream with the given name
    pub fn new(name: impl Into<String>) -> Self {
        let (sender, receiver) = mpsc::unbounded_channel();
        Self {
            name: name.into(),
            sender: Arc::new(sender),
            receiver: Arc::new(RwLock::new(receiver)),
        }
    }

    /// Create a new stream that merges multiple streams
    pub fn merge(streams: Vec<Self>) -> Self {
        let merged = Self::new(format!("merged_{}", uuid::Uuid::new_v4()));

        for stream in streams {
            let sender = merged.sender.clone();
            tokio::spawn(async move {
                while let Ok(event) = stream.receive().await {
                    let _ = sender.send(event);
                }
            });
        }

        merged
    }

    /// Emit an event to the stream
    pub fn emit(&self, event: ThoughtEvent) -> Result<()> {
        self.sender
            .send(event)
            .map_err(|e| crate::Error::ThoughtStreamError(e.to_string()))
    }

    /// Emit an event asynchronously
    pub async fn emit_async(&self, event: ThoughtEvent) -> Result<()> {
        self.emit(event)
    }

    /// Receive an event from the stream
    pub async fn receive(&self) -> Result<ThoughtEvent> {
        let mut receiver = self.receiver.write().await;
        receiver
            .recv()
            .await
            .ok_or_else(|| crate::Error::ThoughtStreamError("Stream closed".into()))
    }

    /// Filter events from the stream
    pub fn filter(&self, predicate: fn(&ThoughtEvent) -> bool) -> Self {
        let filtered = Self::new(format!("filtered_{}", Uuid::new_v4()));
        let filtered_sender = filtered.sender.clone();
        let self_clone = self.clone();

        tokio::spawn(async move {
            loop {
                match self_clone.receive().await {
                    Ok(event) if predicate(&event) => {
                        let _ = filtered_sender.send(event);
                    }
                    Ok(_) => continue,
                    Err(_) => break,
                }
            }
        });

        filtered
    }

    /// Get the stream name
    pub fn name(&self) -> &str {
        &self.name
    }

    /// Check if the stream is empty (approximate)
    /// Note: For unbounded channels, this always returns false
    pub fn is_empty(&self) -> bool {
        false
    }
}

impl std::fmt::Debug for ThoughtStream {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        f.debug_struct("ThoughtStream")
            .field("name", &self.name)
            .finish()
    }
}

/// Builder for creating thought streams
pub struct ThoughtStreamBuilder {
    name: String,
    capacity: Option<usize>,
}

impl ThoughtStreamBuilder {
    /// Create a new builder
    pub fn new(name: impl Into<String>) -> Self {
        Self {
            name: name.into(),
            capacity: None,
        }
    }

    /// Set the channel capacity
    pub fn with_capacity(mut self, capacity: usize) -> Self {
        self.capacity = Some(capacity);
        self
    }

    /// Build the thought stream
    pub fn build(self) -> ThoughtStream {
        ThoughtStream::new(self.name)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[tokio::test]
    async fn test_create_and_emit() {
        let stream = ThoughtStream::new("test");
        let event = ThoughtEvent::observation(
            StructuredContent::text("test observation"),
            ProvenanceChain::new(),
        );

        stream.emit(event.clone()).unwrap();
        let received = stream.receive().await.unwrap();

        assert_eq!(event.content, received.content);
        assert_eq!(event.event_type, received.event_type);
    }

    #[tokio::test]
    async fn test_filter_stream() {
        let stream = ThoughtStream::new("test");

        let obs = ThoughtEvent::observation(
            StructuredContent::text("observation"),
            ProvenanceChain::new(),
        );
        let hyp = ThoughtEvent::hypothesis(
            StructuredContent::text("hypothesis"),
            ProvenanceChain::new(),
            0.5,
        );

        stream.emit(obs.clone()).unwrap();
        stream.emit(hyp.clone()).unwrap();

        let filtered = stream.filter(|e| matches!(e.event_type, ThoughtEventType::Observation));

        let received = filtered.receive().await.unwrap();
        assert_eq!(received.event_type, ThoughtEventType::Observation);
    }

    #[test]
    fn test_confidence_bounds() {
        let event = ThoughtEvent::hypothesis(
            StructuredContent::text("test"),
            ProvenanceChain::new(),
            1.5, // Should be clamped to 1.0
        );

        assert_eq!(event.confidence, 1.0);
    }
}