#![cfg(feature = "streaming")]
use crate::colony::{Colony, ColonyEvent};
use phago_agents::digester::Digester;
use phago_core::types::{DocumentId, Position};
use std::cell::RefCell;
use std::path::{Path, PathBuf};
use std::rc::Rc;
use std::sync::mpsc;
use std::time::Duration;
use tokio::sync::broadcast;
#[derive(Debug, Clone)]
pub struct IngestDocument {
pub title: String,
pub content: String,
pub position: Option<Position>,
pub source_path: Option<PathBuf>,
}
impl IngestDocument {
pub fn new(title: impl Into<String>, content: impl Into<String>) -> Self {
Self {
title: title.into(),
content: content.into(),
position: None,
source_path: None,
}
}
pub fn with_position(mut self, x: f64, y: f64) -> Self {
self.position = Some(Position::new(x, y));
self
}
pub fn with_source(mut self, path: impl Into<PathBuf>) -> Self {
self.source_path = Some(path.into());
self
}
}
#[derive(Debug, Clone)]
pub struct StreamingConfig {
pub queue_capacity: usize,
pub ticks_per_document: u64,
pub background_ticks: bool,
pub tick_interval_ms: u64,
pub auto_layout_spacing: f64,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
queue_capacity: 100,
ticks_per_document: 10,
background_ticks: true,
tick_interval_ms: 100,
auto_layout_spacing: 5.0,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct StreamingMetrics {
pub documents_received: u64,
pub documents_ingested: u64,
pub documents_dropped: u64,
pub queue_depth: usize,
pub ticks_processed: u64,
}
pub struct StreamingColony {
colony: Rc<RefCell<Colony>>,
config: StreamingConfig,
metrics: Rc<RefCell<StreamingMetrics>>,
document_count: Rc<RefCell<u64>>,
event_tx: broadcast::Sender<ColonyEvent>,
}
impl StreamingColony {
pub fn new(colony: Colony, config: StreamingConfig) -> Self {
let (event_tx, _) = broadcast::channel(256);
Self {
colony: Rc::new(RefCell::new(colony)),
config,
metrics: Rc::new(RefCell::new(StreamingMetrics::default())),
document_count: Rc::new(RefCell::new(0)),
event_tx,
}
}
pub fn subscribe_events(&self) -> broadcast::Receiver<ColonyEvent> {
self.event_tx.subscribe()
}
pub fn metrics(&self) -> StreamingMetrics {
self.metrics.borrow().clone()
}
pub fn ingest(&self, doc: IngestDocument) -> DocumentId {
let position = doc.position.unwrap_or_else(|| {
let count = *self.document_count.borrow();
*self.document_count.borrow_mut() += 1;
Position::new(count as f64 * self.config.auto_layout_spacing, 0.0)
});
let doc_id = self.colony.borrow_mut().ingest_document(
&doc.title,
&doc.content,
position,
);
self.colony.borrow_mut().spawn(Box::new(
Digester::new(position).with_max_idle(30),
));
{
let mut metrics = self.metrics.borrow_mut();
metrics.documents_received += 1;
metrics.documents_ingested += 1;
}
self.run_ticks(self.config.ticks_per_document);
doc_id
}
pub async fn ingest_async(&self, doc: IngestDocument) -> DocumentId {
tokio::task::yield_now().await;
self.ingest(doc)
}
pub fn run_ticks(&self, ticks: u64) {
for _ in 0..ticks {
let events = self.colony.borrow_mut().tick();
self.metrics.borrow_mut().ticks_processed += 1;
for event in events {
let _ = self.event_tx.send(event);
}
}
}
pub async fn run_ticks_async(&self, ticks: u64) {
let interval = Duration::from_millis(self.config.tick_interval_ms);
for _ in 0..ticks {
let events = self.colony.borrow_mut().tick();
self.metrics.borrow_mut().ticks_processed += 1;
for event in events {
let _ = self.event_tx.send(event);
}
tokio::time::sleep(interval).await;
}
}
pub async fn process_channel(
&self,
mut rx: tokio::sync::mpsc::Receiver<IngestDocument>,
) {
while let Some(doc) = rx.recv().await {
self.ingest_async(doc).await;
}
}
pub fn colony(&self) -> &Rc<RefCell<Colony>> {
&self.colony
}
pub fn into_colony(self) -> Colony {
match Rc::try_unwrap(self.colony) {
Ok(cell) => cell.into_inner(),
Err(_) => panic!("Cannot unwrap StreamingColony: other references exist"),
}
}
}
#[derive(Debug)]
pub enum WatchEvent {
FileChanged(PathBuf),
FileRemoved(PathBuf),
Error(String),
}
pub struct FileWatcher {
path: PathBuf,
extensions: Vec<String>,
rx: mpsc::Receiver<WatchEvent>,
_watcher: notify::RecommendedWatcher,
}
impl FileWatcher {
pub fn new<P: AsRef<Path>>(path: P) -> Result<Self, String> {
Self::with_extensions(path, vec!["txt", "md", "json"])
}
pub fn with_extensions<P: AsRef<Path>>(
path: P,
extensions: Vec<&str>,
) -> Result<Self, String> {
use notify::{RecursiveMode, Watcher};
let path = path.as_ref().to_path_buf();
let extensions: Vec<String> = extensions.into_iter().map(|s| s.to_string()).collect();
let (tx, rx) = mpsc::channel();
let ext_clone = extensions.clone();
let mut watcher = notify::recommended_watcher(move |res: Result<notify::Event, notify::Error>| {
match res {
Ok(event) => {
for path in event.paths {
if let Some(ext) = path.extension() {
let ext_str = ext.to_string_lossy().to_string();
if ext_clone.contains(&ext_str) {
match event.kind {
notify::EventKind::Create(_) |
notify::EventKind::Modify(_) => {
let _ = tx.send(WatchEvent::FileChanged(path.clone()));
}
notify::EventKind::Remove(_) => {
let _ = tx.send(WatchEvent::FileRemoved(path.clone()));
}
_ => {}
}
}
}
}
}
Err(e) => {
let _ = tx.send(WatchEvent::Error(e.to_string()));
}
}
}).map_err(|e| format!("Failed to create watcher: {}", e))?;
watcher.watch(&path, RecursiveMode::Recursive)
.map_err(|e| format!("Failed to watch path: {}", e))?;
Ok(Self {
path,
extensions,
rx,
_watcher: watcher,
})
}
pub fn path(&self) -> &Path {
&self.path
}
pub fn extensions(&self) -> &[String] {
&self.extensions
}
pub fn try_recv(&self) -> Option<WatchEvent> {
self.rx.try_recv().ok()
}
pub fn recv(&self) -> Option<WatchEvent> {
self.rx.recv().ok()
}
pub fn recv_timeout(&self, timeout: Duration) -> Option<WatchEvent> {
self.rx.recv_timeout(timeout).ok()
}
}
pub struct DocumentChannel {
tx: tokio::sync::mpsc::Sender<IngestDocument>,
rx: Option<tokio::sync::mpsc::Receiver<IngestDocument>>,
capacity: usize,
}
impl DocumentChannel {
pub fn new(capacity: usize) -> Self {
let (tx, rx) = tokio::sync::mpsc::channel(capacity);
Self {
tx,
rx: Some(rx),
capacity,
}
}
pub fn sender(&self) -> tokio::sync::mpsc::Sender<IngestDocument> {
self.tx.clone()
}
pub fn take_receiver(&mut self) -> Option<tokio::sync::mpsc::Receiver<IngestDocument>> {
self.rx.take()
}
pub fn capacity(&self) -> usize {
self.capacity
}
pub fn try_send(&self, doc: IngestDocument) -> bool {
self.tx.try_send(doc).is_ok()
}
pub async fn send(&self, doc: IngestDocument) -> Result<(), String> {
self.tx.send(doc).await
.map_err(|e| format!("Channel closed: {}", e))
}
}
pub fn watch_directory_to_channel(
watcher: FileWatcher,
channel: DocumentChannel,
) -> std::thread::JoinHandle<()> {
let tx = channel.sender();
std::thread::spawn(move || {
loop {
match watcher.recv() {
Some(WatchEvent::FileChanged(path)) => {
if let Ok(content) = std::fs::read_to_string(&path) {
let title = path.file_name()
.map(|s| s.to_string_lossy().to_string())
.unwrap_or_else(|| "Untitled".to_string());
let doc = IngestDocument::new(title, content)
.with_source(path);
if tx.blocking_send(doc).is_err() {
break;
}
}
}
Some(WatchEvent::FileRemoved(_)) => {
}
Some(WatchEvent::Error(e)) => {
eprintln!("File watcher error: {}", e);
}
None => {
break;
}
}
}
})
}
pub fn streaming_from_directory<P: AsRef<Path>>(
colony: Colony,
path: P,
config: StreamingConfig,
) -> Result<(StreamingColony, std::thread::JoinHandle<()>), String> {
let watcher = FileWatcher::new(path)?;
let mut channel = DocumentChannel::new(config.queue_capacity);
let _rx = channel.take_receiver().ok_or("Channel receiver already taken")?;
let streaming = StreamingColony::new(colony, config);
let handle = watch_directory_to_channel(watcher, channel);
Ok((streaming, handle))
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::TempDir;
use std::fs;
#[test]
fn ingest_document_basic() {
let colony = Colony::new();
let streaming = StreamingColony::new(colony, StreamingConfig::default());
let doc = IngestDocument::new("Test", "Content about cells");
let _id = streaming.ingest(doc);
let metrics = streaming.metrics();
assert_eq!(metrics.documents_ingested, 1);
assert!(metrics.ticks_processed > 0);
}
#[test]
fn auto_layout_positions() {
let mut config = StreamingConfig::default();
config.auto_layout_spacing = 10.0;
let colony = Colony::new();
let streaming = StreamingColony::new(colony, config);
for i in 0..5 {
let doc = IngestDocument::new(format!("Doc {}", i), "Content");
streaming.ingest(doc);
}
let metrics = streaming.metrics();
assert_eq!(metrics.documents_ingested, 5);
}
#[test]
fn document_channel_backpressure() {
let channel = DocumentChannel::new(2);
assert!(channel.try_send(IngestDocument::new("1", "c")));
assert!(channel.try_send(IngestDocument::new("2", "c")));
assert!(!channel.try_send(IngestDocument::new("3", "c")));
}
#[tokio::test]
async fn async_channel_send() {
let channel = DocumentChannel::new(10);
let tx = channel.sender();
tx.send(IngestDocument::new("Test", "Content")).await.unwrap();
}
#[test]
fn file_watcher_creation() {
let temp_dir = TempDir::new().unwrap();
let watcher = FileWatcher::new(temp_dir.path());
assert!(watcher.is_ok());
}
#[test]
fn file_watcher_detects_new_file() {
let temp_dir = TempDir::new().unwrap();
let watcher = FileWatcher::new(temp_dir.path()).unwrap();
let file_path = temp_dir.path().join("test.txt");
fs::write(&file_path, "Hello, world!").unwrap();
std::thread::sleep(Duration::from_millis(100));
let _event = watcher.try_recv();
}
}