use crate::types::{ProvenanceChain, StructuredContent, Timestamp};
use crate::Result;
use async_trait::async_trait;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use uuid::Uuid;
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
#[serde(rename_all = "snake_case")]
pub enum ThoughtEventType {
Observation,
Hypothesis,
Intention,
Reflection,
Action,
Warning,
Success,
}
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
pub struct ThoughtEvent {
pub id: Uuid,
pub timestamp: Timestamp,
#[serde(rename = "type")]
pub event_type: ThoughtEventType,
pub content: StructuredContent,
pub provenance: ProvenanceChain,
pub confidence: f64,
}
impl ThoughtEvent {
pub fn new(
event_type: ThoughtEventType,
content: StructuredContent,
provenance: ProvenanceChain,
confidence: f64,
) -> Self {
Self {
id: Uuid::new_v4(),
timestamp: Timestamp::now(),
event_type,
content,
provenance,
confidence: confidence.clamp(0.0, 1.0),
}
}
pub fn observation(content: StructuredContent, provenance: ProvenanceChain) -> Self {
Self::new(ThoughtEventType::Observation, content, provenance, 1.0)
}
pub fn hypothesis(content: StructuredContent, provenance: ProvenanceChain, confidence: f64) -> Self {
Self::new(ThoughtEventType::Hypothesis, content, provenance, confidence)
}
pub fn intention(content: StructuredContent, provenance: ProvenanceChain) -> Self {
Self::new(ThoughtEventType::Intention, content, provenance, 1.0)
}
pub fn reflection(content: StructuredContent, provenance: ProvenanceChain) -> Self {
Self::new(ThoughtEventType::Reflection, content, provenance, 1.0)
}
pub fn is_hypothesis(&self) -> bool {
matches!(self.event_type, ThoughtEventType::Hypothesis)
}
pub fn is_high_confidence(&self) -> bool {
self.confidence > 0.8
}
pub fn is_low_confidence(&self) -> bool {
self.confidence < 0.5
}
}
#[async_trait]
pub trait StreamOperator: Send + Sync {
async fn apply(&self, events: Vec<ThoughtEvent>) -> Result<Vec<ThoughtEvent>>;
}
#[derive(Clone)]
pub struct ThoughtStream {
name: String,
sender: Arc<mpsc::UnboundedSender<ThoughtEvent>>,
receiver: Arc<RwLock<mpsc::UnboundedReceiver<ThoughtEvent>>>,
}
impl ThoughtStream {
pub fn new(name: impl Into<String>) -> Self {
let (sender, receiver) = mpsc::unbounded_channel();
Self {
name: name.into(),
sender: Arc::new(sender),
receiver: Arc::new(RwLock::new(receiver)),
}
}
pub fn merge(streams: Vec<Self>) -> Self {
let merged = Self::new(format!("merged_{}", uuid::Uuid::new_v4()));
for stream in streams {
let sender = merged.sender.clone();
tokio::spawn(async move {
while let Ok(event) = stream.receive().await {
let _ = sender.send(event);
}
});
}
merged
}
pub fn emit(&self, event: ThoughtEvent) -> Result<()> {
self.sender
.send(event)
.map_err(|e| crate::Error::ThoughtStreamError(e.to_string()))
}
pub async fn emit_async(&self, event: ThoughtEvent) -> Result<()> {
self.emit(event)
}
pub async fn receive(&self) -> Result<ThoughtEvent> {
let mut receiver = self.receiver.write().await;
receiver
.recv()
.await
.ok_or_else(|| crate::Error::ThoughtStreamError("Stream closed".into()))
}
pub fn filter(&self, predicate: fn(&ThoughtEvent) -> bool) -> Self {
let filtered = Self::new(format!("filtered_{}", Uuid::new_v4()));
let filtered_sender = filtered.sender.clone();
let self_clone = self.clone();
tokio::spawn(async move {
loop {
match self_clone.receive().await {
Ok(event) if predicate(&event) => {
let _ = filtered_sender.send(event);
}
Ok(_) => continue,
Err(_) => break,
}
}
});
filtered
}
pub fn name(&self) -> &str {
&self.name
}
pub fn is_empty(&self) -> bool {
false
}
}
impl std::fmt::Debug for ThoughtStream {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ThoughtStream")
.field("name", &self.name)
.finish()
}
}
pub struct ThoughtStreamBuilder {
name: String,
capacity: Option<usize>,
}
impl ThoughtStreamBuilder {
pub fn new(name: impl Into<String>) -> Self {
Self {
name: name.into(),
capacity: None,
}
}
pub fn with_capacity(mut self, capacity: usize) -> Self {
self.capacity = Some(capacity);
self
}
pub fn build(self) -> ThoughtStream {
ThoughtStream::new(self.name)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn test_create_and_emit() {
let stream = ThoughtStream::new("test");
let event = ThoughtEvent::observation(
StructuredContent::text("test observation"),
ProvenanceChain::new(),
);
stream.emit(event.clone()).unwrap();
let received = stream.receive().await.unwrap();
assert_eq!(event.content, received.content);
assert_eq!(event.event_type, received.event_type);
}
#[tokio::test]
async fn test_filter_stream() {
let stream = ThoughtStream::new("test");
let obs = ThoughtEvent::observation(
StructuredContent::text("observation"),
ProvenanceChain::new(),
);
let hyp = ThoughtEvent::hypothesis(
StructuredContent::text("hypothesis"),
ProvenanceChain::new(),
0.5,
);
stream.emit(obs.clone()).unwrap();
stream.emit(hyp.clone()).unwrap();
let filtered = stream.filter(|e| matches!(e.event_type, ThoughtEventType::Observation));
let received = filtered.receive().await.unwrap();
assert_eq!(received.event_type, ThoughtEventType::Observation);
}
#[test]
fn test_confidence_bounds() {
let event = ThoughtEvent::hypothesis(
StructuredContent::text("test"),
ProvenanceChain::new(),
1.5, );
assert_eq!(event.confidence, 1.0);
}
}