1mod applier;
30mod processor;
31mod sink;
32mod subscriber;
33
34pub use applier::{Applier, ApplierConfig, ApplierError, ApplyMode, ApplyResult};
35pub use processor::{ProcessResult, Processor, ProcessorConfig, ProcessorError, ProcessorMode};
36pub use sink::{DataSink, DataSinkError, DataSinkStats};
37pub use subscriber::{ActionEventSubscriber, EventSubscriberConfig, LearningEventSubscriber};
38
39use std::path::PathBuf;
40use std::sync::Arc;
41use std::time::Duration;
42
43use tokio::sync::mpsc;
44use tokio::time::interval;
45
46use crate::learn::learn_model::{LearnModel, WorkerDecisionSequenceLearn};
47use crate::learn::lora::{
48 LoraTrainer, LoraTrainerConfig, ModelApplicator, NoOpApplicator, TrainedModel,
49};
50use crate::learn::record::Record;
51use crate::learn::snapshot::LearningStore;
52use crate::learn::store::{
53 EpisodeStore, FileEpisodeStore, FileRecordStore, InMemoryEpisodeStore, InMemoryRecordStore,
54 RecordStore, RecordStoreError, StoreError,
55};
56use crate::learn::trigger::{TrainTrigger, TriggerBuilder, TriggerContext};
57use crate::util::epoch_millis;
58
59#[derive(Debug)]
65pub enum DaemonError {
66 Sink(DataSinkError),
68 Processor(ProcessorError),
70 Applier(ApplierError),
72 Io(std::io::Error),
74 Config(String),
76 Shutdown,
78}
79
80impl std::fmt::Display for DaemonError {
81 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82 match self {
83 Self::Sink(e) => write!(f, "Sink error: {}", e),
84 Self::Processor(e) => write!(f, "Processor error: {}", e),
85 Self::Applier(e) => write!(f, "Applier error: {}", e),
86 Self::Io(e) => write!(f, "IO error: {}", e),
87 Self::Config(msg) => write!(f, "Config error: {}", msg),
88 Self::Shutdown => write!(f, "Daemon shutdown"),
89 }
90 }
91}
92
93impl std::error::Error for DaemonError {}
94
95impl From<DataSinkError> for DaemonError {
96 fn from(e: DataSinkError) -> Self {
97 Self::Sink(e)
98 }
99}
100
101impl From<ProcessorError> for DaemonError {
102 fn from(e: ProcessorError) -> Self {
103 Self::Processor(e)
104 }
105}
106
107impl From<ApplierError> for DaemonError {
108 fn from(e: ApplierError) -> Self {
109 Self::Applier(e)
110 }
111}
112
113impl From<std::io::Error> for DaemonError {
114 fn from(e: std::io::Error) -> Self {
115 Self::Io(e)
116 }
117}
118
119impl From<RecordStoreError> for DaemonError {
120 fn from(e: RecordStoreError) -> Self {
121 Self::Sink(DataSinkError::RecordStore(e))
122 }
123}
124
125impl From<StoreError> for DaemonError {
126 fn from(e: StoreError) -> Self {
127 Self::Sink(DataSinkError::EpisodeStore(e))
128 }
129}
130
131#[derive(Debug, Clone)]
137pub struct DaemonConfig {
138 pub scenario: String,
140 pub data_dir: PathBuf,
142 pub check_interval: Duration,
144 pub processor_mode: ProcessorMode,
146 pub max_sessions: usize,
148 pub auto_apply: bool,
150 pub lora_config: Option<LoraTrainerConfig>,
152}
153
154impl DaemonConfig {
155 pub fn new(scenario: impl Into<String>) -> Self {
157 Self {
158 scenario: scenario.into(),
159 data_dir: default_data_dir(),
160 check_interval: Duration::from_secs(10),
161 processor_mode: ProcessorMode::OfflineOnly,
162 max_sessions: 20,
163 auto_apply: false,
164 lora_config: None,
165 }
166 }
167
168 pub fn data_dir(mut self, path: impl Into<PathBuf>) -> Self {
170 self.data_dir = path.into();
171 self
172 }
173
174 pub fn check_interval(mut self, interval: Duration) -> Self {
176 self.check_interval = interval;
177 self
178 }
179
180 pub fn processor_mode(mut self, mode: ProcessorMode) -> Self {
182 self.processor_mode = mode;
183 self
184 }
185
186 pub fn max_sessions(mut self, n: usize) -> Self {
188 self.max_sessions = n;
189 self
190 }
191
192 pub fn auto_apply(mut self, enabled: bool) -> Self {
194 self.auto_apply = enabled;
195 self
196 }
197
198 pub fn with_lora(mut self, config: LoraTrainerConfig) -> Self {
200 self.lora_config = Some(config);
201 self
202 }
203}
204
205fn default_data_dir() -> PathBuf {
206 dirs::data_dir()
207 .unwrap_or_else(|| PathBuf::from("."))
208 .join("swarm-engine")
209 .join("learning")
210}
211
212#[derive(Debug, Clone, Default)]
218pub struct DaemonStats {
219 pub records_received: usize,
221 pub episodes_created: usize,
223 pub trainings_completed: usize,
225 pub models_applied: usize,
227 pub last_train_at: Option<u64>,
229 pub started_at: u64,
231}
232
233pub struct LearningDaemon {
239 config: DaemonConfig,
241 sink: DataSink,
243 trigger: Arc<dyn TrainTrigger>,
245 processor: Processor,
247 applier: Option<Applier>,
249 stats: DaemonStats,
251 last_train_count: usize,
253 record_rx: mpsc::Receiver<Vec<Record>>,
255 record_tx: mpsc::Sender<Vec<Record>>,
257 shutdown_rx: mpsc::Receiver<()>,
259 shutdown_tx: mpsc::Sender<()>,
261}
262
263impl LearningDaemon {
264 pub fn new(config: DaemonConfig, trigger: Arc<dyn TrainTrigger>) -> Result<Self, DaemonError> {
266 let record_store: Arc<dyn RecordStore> = Arc::new(InMemoryRecordStore::new());
267 let episode_store: Arc<dyn EpisodeStore> = Arc::new(InMemoryEpisodeStore::new());
268 let learn_model: Arc<dyn LearnModel> = Arc::new(WorkerDecisionSequenceLearn::new());
269
270 Self::with_stores(config, trigger, record_store, episode_store, learn_model)
271 }
272
273 pub fn with_file_stores(
275 config: DaemonConfig,
276 trigger: Arc<dyn TrainTrigger>,
277 ) -> Result<Self, DaemonError> {
278 std::fs::create_dir_all(&config.data_dir)?;
280
281 let record_store: Arc<dyn RecordStore> =
282 Arc::new(FileRecordStore::new(config.data_dir.join("records"))?);
283 let episode_store: Arc<dyn EpisodeStore> =
284 Arc::new(FileEpisodeStore::new(config.data_dir.join("episodes"))?);
285 let learn_model: Arc<dyn LearnModel> = Arc::new(WorkerDecisionSequenceLearn::new());
286
287 Self::with_stores(config, trigger, record_store, episode_store, learn_model)
288 }
289
290 pub fn with_stores(
292 config: DaemonConfig,
293 trigger: Arc<dyn TrainTrigger>,
294 record_store: Arc<dyn RecordStore>,
295 episode_store: Arc<dyn EpisodeStore>,
296 learn_model: Arc<dyn LearnModel>,
297 ) -> Result<Self, DaemonError> {
298 let sink = DataSink::new(
300 record_store,
301 Arc::clone(&episode_store),
302 Arc::clone(&learn_model),
303 );
304
305 let processor_config = ProcessorConfig::new(&config.scenario)
307 .mode(config.processor_mode)
308 .max_sessions(config.max_sessions);
309
310 let mut processor = Processor::new(processor_config);
311
312 let learning_store = LearningStore::new(&config.data_dir)?;
314 processor = processor.with_learning_store(learning_store);
315
316 if let Some(lora_config) = &config.lora_config {
318 let trainer = LoraTrainer::new(lora_config.clone(), episode_store);
319 processor = processor
320 .with_lora_trainer(trainer)
321 .with_learn_model(learn_model);
322 }
323
324 let applier = if config.auto_apply {
326 let applier_config = ApplierConfig::default().auto_apply();
327 let applicator: Arc<dyn ModelApplicator> = Arc::new(NoOpApplicator::new());
329 Some(Applier::new(applier_config, applicator))
330 } else {
331 None
332 };
333
334 let (record_tx, record_rx) = mpsc::channel(1000);
336 let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
337
338 Ok(Self {
339 config,
340 sink,
341 trigger,
342 processor,
343 applier,
344 stats: DaemonStats {
345 started_at: epoch_millis(),
346 ..Default::default()
347 },
348 last_train_count: 0,
349 record_rx,
350 record_tx,
351 shutdown_rx,
352 shutdown_tx,
353 })
354 }
355
356 pub fn record_sender(&self) -> mpsc::Sender<Vec<Record>> {
358 self.record_tx.clone()
359 }
360
361 pub fn shutdown_sender(&self) -> mpsc::Sender<()> {
363 self.shutdown_tx.clone()
364 }
365
366 pub fn config(&self) -> &DaemonConfig {
368 &self.config
369 }
370
371 pub fn stats(&self) -> &DaemonStats {
373 &self.stats
374 }
375
376 pub async fn run(&mut self) -> Result<(), DaemonError> {
378 tracing::info!(
379 scenario = %self.config.scenario,
380 data_dir = %self.config.data_dir.display(),
381 trigger = self.trigger.name(),
382 "Learning daemon started"
383 );
384
385 let mut check_interval = interval(self.config.check_interval);
386
387 loop {
388 tokio::select! {
389 _ = self.shutdown_rx.recv() => {
391 tracing::info!("Shutdown signal received");
392 return Ok(());
393 }
394
395 Some(records) = self.record_rx.recv() => {
397 self.handle_records(records).await?;
398 }
399
400 _ = check_interval.tick() => {
402 self.check_and_train().await?;
403 }
404 }
405 }
406 }
407
408 async fn handle_records(&mut self, records: Vec<Record>) -> Result<(), DaemonError> {
410 if records.is_empty() {
411 return Ok(());
412 }
413
414 let count = records.len();
415 let episode_ids = self.sink.ingest(records)?;
416
417 self.stats.records_received += count;
418 self.stats.episodes_created += episode_ids.len();
419
420 tracing::debug!(
421 records = count,
422 episodes = episode_ids.len(),
423 "Processed records"
424 );
425
426 Ok(())
427 }
428
429 async fn check_and_train(&mut self) -> Result<(), DaemonError> {
431 let current_count = self.sink.episode_count();
433 let ctx = TriggerContext::with_count(current_count)
434 .last_train_at(self.stats.last_train_at.unwrap_or(0))
435 .last_train_count(self.last_train_count);
436
437 if !self.trigger.should_train(&ctx).unwrap_or(false) {
438 return Ok(());
439 }
440
441 tracing::info!(
442 episode_count = current_count,
443 trigger = self.trigger.name(),
444 "Trigger fired, starting learning"
445 );
446
447 let result = self
449 .processor
450 .run(self.sink.episode_store().as_ref())
451 .await?;
452
453 self.stats.trainings_completed += 1;
455 self.stats.last_train_at = Some(epoch_millis());
456 self.last_train_count = current_count;
457
458 if let Some(applier) = &mut self.applier {
460 if let Some(model) = result.lora_model() {
461 let apply_result = applier.apply(model).await?;
462 if apply_result.is_applied() {
463 self.stats.models_applied += 1;
464 }
465 }
466 }
467
468 tracing::info!(
469 trainings = self.stats.trainings_completed,
470 models_applied = self.stats.models_applied,
471 "Learning cycle completed"
472 );
473
474 Ok(())
475 }
476
477 pub async fn train_now(&mut self) -> Result<ProcessResult, DaemonError> {
479 tracing::info!("Manual training triggered");
480
481 let result = self
482 .processor
483 .run(self.sink.episode_store().as_ref())
484 .await?;
485
486 self.stats.trainings_completed += 1;
487 self.stats.last_train_at = Some(epoch_millis());
488 self.last_train_count = self.sink.episode_count();
489
490 Ok(result)
491 }
492
493 pub async fn apply_model(&mut self, model: &TrainedModel) -> Result<ApplyResult, DaemonError> {
495 let applier = self
496 .applier
497 .as_mut()
498 .ok_or_else(|| DaemonError::Config("Applier not configured".into()))?;
499
500 let result = applier.apply_now(model).await?;
501 if result.is_applied() {
502 self.stats.models_applied += 1;
503 }
504
505 Ok(result)
506 }
507}
508
509pub struct DaemonBuilder {
515 config: DaemonConfig,
516 trigger: Option<Arc<dyn TrainTrigger>>,
517 record_store: Option<Arc<dyn RecordStore>>,
518 episode_store: Option<Arc<dyn EpisodeStore>>,
519 learn_model: Option<Arc<dyn LearnModel>>,
520 applicator: Option<Arc<dyn ModelApplicator>>,
521}
522
523impl DaemonBuilder {
524 pub fn new(scenario: impl Into<String>) -> Self {
526 Self {
527 config: DaemonConfig::new(scenario),
528 trigger: None,
529 record_store: None,
530 episode_store: None,
531 learn_model: None,
532 applicator: None,
533 }
534 }
535
536 pub fn data_dir(mut self, path: impl Into<PathBuf>) -> Self {
538 self.config.data_dir = path.into();
539 self
540 }
541
542 pub fn trigger(mut self, trigger: Arc<dyn TrainTrigger>) -> Self {
544 self.trigger = Some(trigger);
545 self
546 }
547
548 pub fn processor_mode(mut self, mode: ProcessorMode) -> Self {
550 self.config.processor_mode = mode;
551 self
552 }
553
554 pub fn auto_apply(mut self) -> Self {
556 self.config.auto_apply = true;
557 self
558 }
559
560 pub fn record_store(mut self, store: Arc<dyn RecordStore>) -> Self {
562 self.record_store = Some(store);
563 self
564 }
565
566 pub fn episode_store(mut self, store: Arc<dyn EpisodeStore>) -> Self {
568 self.episode_store = Some(store);
569 self
570 }
571
572 pub fn learn_model(mut self, model: Arc<dyn LearnModel>) -> Self {
574 self.learn_model = Some(model);
575 self
576 }
577
578 pub fn applicator(mut self, applicator: Arc<dyn ModelApplicator>) -> Self {
580 self.applicator = Some(applicator);
581 self
582 }
583
584 pub fn with_lora(mut self, config: LoraTrainerConfig) -> Self {
586 self.config.lora_config = Some(config);
587 self
588 }
589
590 pub fn build(self) -> Result<LearningDaemon, DaemonError> {
592 let trigger = self
593 .trigger
594 .unwrap_or_else(|| TriggerBuilder::default_watch());
595
596 let record_store = self
597 .record_store
598 .unwrap_or_else(|| Arc::new(InMemoryRecordStore::new()));
599
600 let episode_store = self
601 .episode_store
602 .unwrap_or_else(|| Arc::new(InMemoryEpisodeStore::new()));
603
604 let learn_model = self
605 .learn_model
606 .unwrap_or_else(|| Arc::new(WorkerDecisionSequenceLearn::new()));
607
608 LearningDaemon::with_stores(
609 self.config,
610 trigger,
611 record_store,
612 episode_store,
613 learn_model,
614 )
615 }
616}
617
618#[cfg(test)]
623mod tests {
624 use super::*;
625 use crate::events::{ActionContext, ActionEventBuilder, ActionEventResult};
626 use crate::learn::trigger::AlwaysTrigger;
627 use crate::types::WorkerId;
628
629 fn make_test_records(count: usize) -> Vec<Record> {
630 (0..count)
631 .map(|i| {
632 let event = ActionEventBuilder::new(i as u64, WorkerId(0), &format!("Action{}", i))
633 .result(ActionEventResult::success())
634 .duration(std::time::Duration::from_millis(10))
635 .context(ActionContext::new())
636 .build();
637 Record::from(&event)
638 })
639 .collect()
640 }
641
642 #[test]
643 fn test_daemon_config_builder() {
644 let config = DaemonConfig::new("test")
645 .data_dir("/tmp/test")
646 .check_interval(Duration::from_secs(30))
647 .processor_mode(ProcessorMode::Full)
648 .auto_apply(true);
649
650 assert_eq!(config.scenario, "test");
651 assert_eq!(config.data_dir, PathBuf::from("/tmp/test"));
652 assert_eq!(config.check_interval, Duration::from_secs(30));
653 assert_eq!(config.processor_mode, ProcessorMode::Full);
654 assert!(config.auto_apply);
655 }
656
657 #[tokio::test]
658 async fn test_daemon_creation() {
659 let config = DaemonConfig::new("test");
660 let trigger = TriggerBuilder::never();
661
662 let daemon = LearningDaemon::new(config, trigger).unwrap();
663 assert_eq!(daemon.config().scenario, "test");
664 assert_eq!(daemon.stats().records_received, 0);
665 }
666
667 #[tokio::test]
668 async fn test_daemon_record_ingestion() {
669 let config = DaemonConfig::new("test");
670 let trigger = TriggerBuilder::never(); let mut daemon = LearningDaemon::new(config, trigger).unwrap();
673 let sender = daemon.record_sender();
674
675 let records = make_test_records(5);
677 sender.send(records).await.unwrap();
678
679 daemon.handle_records(make_test_records(3)).await.unwrap();
681
682 assert_eq!(daemon.stats().records_received, 3);
683 }
684
685 #[tokio::test]
686 async fn test_daemon_builder() {
687 let daemon = DaemonBuilder::new("test-scenario")
688 .data_dir("/tmp/test")
689 .trigger(Arc::new(AlwaysTrigger))
690 .processor_mode(ProcessorMode::OfflineOnly)
691 .build()
692 .unwrap();
693
694 assert_eq!(daemon.config().scenario, "test-scenario");
695 }
696}