1mod applier;
30mod processor;
31mod sink;
32
33pub use applier::{Applier, ApplierConfig, ApplierError, ApplyMode, ApplyResult};
34pub use processor::{ProcessResult, Processor, ProcessorConfig, ProcessorError, ProcessorMode};
35pub use sink::{DataSink, DataSinkError, DataSinkStats};
36
37use std::path::PathBuf;
38use std::sync::Arc;
39use std::time::Duration;
40
41use tokio::sync::mpsc;
42use tokio::time::interval;
43
44use crate::learn::learn_model::{LearnModel, WorkerDecisionSequenceLearn};
45use crate::learn::lora::{
46 LoraTrainer, LoraTrainerConfig, ModelApplicator, NoOpApplicator, TrainedModel,
47};
48use crate::learn::record::Record;
49use crate::learn::snapshot::LearningStore;
50use crate::learn::store::{
51 EpisodeStore, FileEpisodeStore, FileRecordStore, InMemoryEpisodeStore, InMemoryRecordStore,
52 RecordStore, RecordStoreError, StoreError,
53};
54use crate::learn::trigger::{TrainTrigger, TriggerBuilder, TriggerContext};
55use crate::util::epoch_millis;
56
57#[derive(Debug)]
63pub enum DaemonError {
64 Sink(DataSinkError),
66 Processor(ProcessorError),
68 Applier(ApplierError),
70 Io(std::io::Error),
72 Config(String),
74 Shutdown,
76}
77
78impl std::fmt::Display for DaemonError {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 match self {
81 Self::Sink(e) => write!(f, "Sink error: {}", e),
82 Self::Processor(e) => write!(f, "Processor error: {}", e),
83 Self::Applier(e) => write!(f, "Applier error: {}", e),
84 Self::Io(e) => write!(f, "IO error: {}", e),
85 Self::Config(msg) => write!(f, "Config error: {}", msg),
86 Self::Shutdown => write!(f, "Daemon shutdown"),
87 }
88 }
89}
90
91impl std::error::Error for DaemonError {}
92
93impl From<DataSinkError> for DaemonError {
94 fn from(e: DataSinkError) -> Self {
95 Self::Sink(e)
96 }
97}
98
99impl From<ProcessorError> for DaemonError {
100 fn from(e: ProcessorError) -> Self {
101 Self::Processor(e)
102 }
103}
104
105impl From<ApplierError> for DaemonError {
106 fn from(e: ApplierError) -> Self {
107 Self::Applier(e)
108 }
109}
110
111impl From<std::io::Error> for DaemonError {
112 fn from(e: std::io::Error) -> Self {
113 Self::Io(e)
114 }
115}
116
117impl From<RecordStoreError> for DaemonError {
118 fn from(e: RecordStoreError) -> Self {
119 Self::Sink(DataSinkError::RecordStore(e))
120 }
121}
122
123impl From<StoreError> for DaemonError {
124 fn from(e: StoreError) -> Self {
125 Self::Sink(DataSinkError::EpisodeStore(e))
126 }
127}
128
129#[derive(Debug, Clone)]
135pub struct DaemonConfig {
136 pub scenario: String,
138 pub data_dir: PathBuf,
140 pub check_interval: Duration,
142 pub processor_mode: ProcessorMode,
144 pub max_sessions: usize,
146 pub auto_apply: bool,
148 pub lora_config: Option<LoraTrainerConfig>,
150}
151
152impl DaemonConfig {
153 pub fn new(scenario: impl Into<String>) -> Self {
155 Self {
156 scenario: scenario.into(),
157 data_dir: default_data_dir(),
158 check_interval: Duration::from_secs(10),
159 processor_mode: ProcessorMode::OfflineOnly,
160 max_sessions: 20,
161 auto_apply: false,
162 lora_config: None,
163 }
164 }
165
166 pub fn data_dir(mut self, path: impl Into<PathBuf>) -> Self {
168 self.data_dir = path.into();
169 self
170 }
171
172 pub fn check_interval(mut self, interval: Duration) -> Self {
174 self.check_interval = interval;
175 self
176 }
177
178 pub fn processor_mode(mut self, mode: ProcessorMode) -> Self {
180 self.processor_mode = mode;
181 self
182 }
183
184 pub fn max_sessions(mut self, n: usize) -> Self {
186 self.max_sessions = n;
187 self
188 }
189
190 pub fn auto_apply(mut self, enabled: bool) -> Self {
192 self.auto_apply = enabled;
193 self
194 }
195
196 pub fn with_lora(mut self, config: LoraTrainerConfig) -> Self {
198 self.lora_config = Some(config);
199 self
200 }
201}
202
203fn default_data_dir() -> PathBuf {
204 dirs::data_dir()
205 .unwrap_or_else(|| PathBuf::from("."))
206 .join("swarm-engine")
207 .join("learning")
208}
209
210#[derive(Debug, Clone, Default)]
216pub struct DaemonStats {
217 pub records_received: usize,
219 pub episodes_created: usize,
221 pub trainings_completed: usize,
223 pub models_applied: usize,
225 pub last_train_at: Option<u64>,
227 pub started_at: u64,
229}
230
231pub struct LearningDaemon {
237 config: DaemonConfig,
239 sink: DataSink,
241 trigger: Arc<dyn TrainTrigger>,
243 processor: Processor,
245 applier: Option<Applier>,
247 stats: DaemonStats,
249 last_train_count: usize,
251 record_rx: mpsc::Receiver<Vec<Record>>,
253 record_tx: mpsc::Sender<Vec<Record>>,
255 shutdown_rx: mpsc::Receiver<()>,
257 shutdown_tx: mpsc::Sender<()>,
259}
260
261impl LearningDaemon {
262 pub fn new(config: DaemonConfig, trigger: Arc<dyn TrainTrigger>) -> Result<Self, DaemonError> {
264 let record_store: Arc<dyn RecordStore> = Arc::new(InMemoryRecordStore::new());
265 let episode_store: Arc<dyn EpisodeStore> = Arc::new(InMemoryEpisodeStore::new());
266 let learn_model: Arc<dyn LearnModel> = Arc::new(WorkerDecisionSequenceLearn::new());
267
268 Self::with_stores(config, trigger, record_store, episode_store, learn_model)
269 }
270
271 pub fn with_file_stores(
273 config: DaemonConfig,
274 trigger: Arc<dyn TrainTrigger>,
275 ) -> Result<Self, DaemonError> {
276 std::fs::create_dir_all(&config.data_dir)?;
278
279 let record_store: Arc<dyn RecordStore> =
280 Arc::new(FileRecordStore::new(config.data_dir.join("records"))?);
281 let episode_store: Arc<dyn EpisodeStore> =
282 Arc::new(FileEpisodeStore::new(config.data_dir.join("episodes"))?);
283 let learn_model: Arc<dyn LearnModel> = Arc::new(WorkerDecisionSequenceLearn::new());
284
285 Self::with_stores(config, trigger, record_store, episode_store, learn_model)
286 }
287
288 pub fn with_stores(
290 config: DaemonConfig,
291 trigger: Arc<dyn TrainTrigger>,
292 record_store: Arc<dyn RecordStore>,
293 episode_store: Arc<dyn EpisodeStore>,
294 learn_model: Arc<dyn LearnModel>,
295 ) -> Result<Self, DaemonError> {
296 let sink = DataSink::new(
298 record_store,
299 Arc::clone(&episode_store),
300 Arc::clone(&learn_model),
301 );
302
303 let processor_config = ProcessorConfig::new(&config.scenario)
305 .mode(config.processor_mode)
306 .max_sessions(config.max_sessions);
307
308 let mut processor = Processor::new(processor_config);
309
310 let learning_store = LearningStore::new(&config.data_dir)?;
312 processor = processor.with_learning_store(learning_store);
313
314 if let Some(lora_config) = &config.lora_config {
316 let trainer = LoraTrainer::new(lora_config.clone(), episode_store);
317 processor = processor
318 .with_lora_trainer(trainer)
319 .with_learn_model(learn_model);
320 }
321
322 let applier = if config.auto_apply {
324 let applier_config = ApplierConfig::default().auto_apply();
325 let applicator: Arc<dyn ModelApplicator> = Arc::new(NoOpApplicator::new());
327 Some(Applier::new(applier_config, applicator))
328 } else {
329 None
330 };
331
332 let (record_tx, record_rx) = mpsc::channel(1000);
334 let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
335
336 Ok(Self {
337 config,
338 sink,
339 trigger,
340 processor,
341 applier,
342 stats: DaemonStats {
343 started_at: epoch_millis(),
344 ..Default::default()
345 },
346 last_train_count: 0,
347 record_rx,
348 record_tx,
349 shutdown_rx,
350 shutdown_tx,
351 })
352 }
353
354 pub fn record_sender(&self) -> mpsc::Sender<Vec<Record>> {
356 self.record_tx.clone()
357 }
358
359 pub fn shutdown_sender(&self) -> mpsc::Sender<()> {
361 self.shutdown_tx.clone()
362 }
363
364 pub fn config(&self) -> &DaemonConfig {
366 &self.config
367 }
368
369 pub fn stats(&self) -> &DaemonStats {
371 &self.stats
372 }
373
374 pub async fn run(&mut self) -> Result<(), DaemonError> {
376 tracing::info!(
377 scenario = %self.config.scenario,
378 data_dir = %self.config.data_dir.display(),
379 trigger = self.trigger.name(),
380 "Learning daemon started"
381 );
382
383 let mut check_interval = interval(self.config.check_interval);
384
385 loop {
386 tokio::select! {
387 _ = self.shutdown_rx.recv() => {
389 tracing::info!("Shutdown signal received");
390 return Ok(());
391 }
392
393 Some(records) = self.record_rx.recv() => {
395 self.handle_records(records).await?;
396 }
397
398 _ = check_interval.tick() => {
400 self.check_and_train().await?;
401 }
402 }
403 }
404 }
405
406 async fn handle_records(&mut self, records: Vec<Record>) -> Result<(), DaemonError> {
408 if records.is_empty() {
409 return Ok(());
410 }
411
412 let count = records.len();
413 let episode_ids = self.sink.ingest(records)?;
414
415 self.stats.records_received += count;
416 self.stats.episodes_created += episode_ids.len();
417
418 tracing::debug!(
419 records = count,
420 episodes = episode_ids.len(),
421 "Processed records"
422 );
423
424 Ok(())
425 }
426
427 async fn check_and_train(&mut self) -> Result<(), DaemonError> {
429 let current_count = self.sink.episode_count();
431 let ctx = TriggerContext::with_count(current_count)
432 .last_train_at(self.stats.last_train_at.unwrap_or(0))
433 .last_train_count(self.last_train_count);
434
435 if !self.trigger.should_train(&ctx).unwrap_or(false) {
436 return Ok(());
437 }
438
439 tracing::info!(
440 episode_count = current_count,
441 trigger = self.trigger.name(),
442 "Trigger fired, starting learning"
443 );
444
445 let result = self
447 .processor
448 .run(self.sink.episode_store().as_ref())
449 .await?;
450
451 self.stats.trainings_completed += 1;
453 self.stats.last_train_at = Some(epoch_millis());
454 self.last_train_count = current_count;
455
456 if let Some(applier) = &mut self.applier {
458 if let Some(model) = result.lora_model() {
459 let apply_result = applier.apply(model).await?;
460 if apply_result.is_applied() {
461 self.stats.models_applied += 1;
462 }
463 }
464 }
465
466 tracing::info!(
467 trainings = self.stats.trainings_completed,
468 models_applied = self.stats.models_applied,
469 "Learning cycle completed"
470 );
471
472 Ok(())
473 }
474
475 pub async fn train_now(&mut self) -> Result<ProcessResult, DaemonError> {
477 tracing::info!("Manual training triggered");
478
479 let result = self
480 .processor
481 .run(self.sink.episode_store().as_ref())
482 .await?;
483
484 self.stats.trainings_completed += 1;
485 self.stats.last_train_at = Some(epoch_millis());
486 self.last_train_count = self.sink.episode_count();
487
488 Ok(result)
489 }
490
491 pub async fn apply_model(&mut self, model: &TrainedModel) -> Result<ApplyResult, DaemonError> {
493 let applier = self
494 .applier
495 .as_mut()
496 .ok_or_else(|| DaemonError::Config("Applier not configured".into()))?;
497
498 let result = applier.apply_now(model).await?;
499 if result.is_applied() {
500 self.stats.models_applied += 1;
501 }
502
503 Ok(result)
504 }
505}
506
507pub struct DaemonBuilder {
513 config: DaemonConfig,
514 trigger: Option<Arc<dyn TrainTrigger>>,
515 record_store: Option<Arc<dyn RecordStore>>,
516 episode_store: Option<Arc<dyn EpisodeStore>>,
517 learn_model: Option<Arc<dyn LearnModel>>,
518 applicator: Option<Arc<dyn ModelApplicator>>,
519}
520
521impl DaemonBuilder {
522 pub fn new(scenario: impl Into<String>) -> Self {
524 Self {
525 config: DaemonConfig::new(scenario),
526 trigger: None,
527 record_store: None,
528 episode_store: None,
529 learn_model: None,
530 applicator: None,
531 }
532 }
533
534 pub fn data_dir(mut self, path: impl Into<PathBuf>) -> Self {
536 self.config.data_dir = path.into();
537 self
538 }
539
540 pub fn trigger(mut self, trigger: Arc<dyn TrainTrigger>) -> Self {
542 self.trigger = Some(trigger);
543 self
544 }
545
546 pub fn processor_mode(mut self, mode: ProcessorMode) -> Self {
548 self.config.processor_mode = mode;
549 self
550 }
551
552 pub fn auto_apply(mut self) -> Self {
554 self.config.auto_apply = true;
555 self
556 }
557
558 pub fn record_store(mut self, store: Arc<dyn RecordStore>) -> Self {
560 self.record_store = Some(store);
561 self
562 }
563
564 pub fn episode_store(mut self, store: Arc<dyn EpisodeStore>) -> Self {
566 self.episode_store = Some(store);
567 self
568 }
569
570 pub fn learn_model(mut self, model: Arc<dyn LearnModel>) -> Self {
572 self.learn_model = Some(model);
573 self
574 }
575
576 pub fn applicator(mut self, applicator: Arc<dyn ModelApplicator>) -> Self {
578 self.applicator = Some(applicator);
579 self
580 }
581
582 pub fn with_lora(mut self, config: LoraTrainerConfig) -> Self {
584 self.config.lora_config = Some(config);
585 self
586 }
587
588 pub fn build(self) -> Result<LearningDaemon, DaemonError> {
590 let trigger = self
591 .trigger
592 .unwrap_or_else(|| TriggerBuilder::default_watch());
593
594 let record_store = self
595 .record_store
596 .unwrap_or_else(|| Arc::new(InMemoryRecordStore::new()));
597
598 let episode_store = self
599 .episode_store
600 .unwrap_or_else(|| Arc::new(InMemoryEpisodeStore::new()));
601
602 let learn_model = self
603 .learn_model
604 .unwrap_or_else(|| Arc::new(WorkerDecisionSequenceLearn::new()));
605
606 LearningDaemon::with_stores(
607 self.config,
608 trigger,
609 record_store,
610 episode_store,
611 learn_model,
612 )
613 }
614}
615
616#[cfg(test)]
621mod tests {
622 use super::*;
623 use crate::events::{ActionContext, ActionEventBuilder, ActionEventResult};
624 use crate::learn::trigger::AlwaysTrigger;
625 use crate::types::WorkerId;
626
627 fn make_test_records(count: usize) -> Vec<Record> {
628 (0..count)
629 .map(|i| {
630 let event = ActionEventBuilder::new(i as u64, WorkerId(0), &format!("Action{}", i))
631 .result(ActionEventResult::success())
632 .duration(std::time::Duration::from_millis(10))
633 .context(ActionContext::new())
634 .build();
635 Record::from(&event)
636 })
637 .collect()
638 }
639
640 #[test]
641 fn test_daemon_config_builder() {
642 let config = DaemonConfig::new("test")
643 .data_dir("/tmp/test")
644 .check_interval(Duration::from_secs(30))
645 .processor_mode(ProcessorMode::Full)
646 .auto_apply(true);
647
648 assert_eq!(config.scenario, "test");
649 assert_eq!(config.data_dir, PathBuf::from("/tmp/test"));
650 assert_eq!(config.check_interval, Duration::from_secs(30));
651 assert_eq!(config.processor_mode, ProcessorMode::Full);
652 assert!(config.auto_apply);
653 }
654
655 #[tokio::test]
656 async fn test_daemon_creation() {
657 let config = DaemonConfig::new("test");
658 let trigger = TriggerBuilder::never();
659
660 let daemon = LearningDaemon::new(config, trigger).unwrap();
661 assert_eq!(daemon.config().scenario, "test");
662 assert_eq!(daemon.stats().records_received, 0);
663 }
664
665 #[tokio::test]
666 async fn test_daemon_record_ingestion() {
667 let config = DaemonConfig::new("test");
668 let trigger = TriggerBuilder::never(); let mut daemon = LearningDaemon::new(config, trigger).unwrap();
671 let sender = daemon.record_sender();
672
673 let records = make_test_records(5);
675 sender.send(records).await.unwrap();
676
677 daemon.handle_records(make_test_records(3)).await.unwrap();
679
680 assert_eq!(daemon.stats().records_received, 3);
681 }
682
683 #[tokio::test]
684 async fn test_daemon_builder() {
685 let daemon = DaemonBuilder::new("test-scenario")
686 .data_dir("/tmp/test")
687 .trigger(Arc::new(AlwaysTrigger))
688 .processor_mode(ProcessorMode::OfflineOnly)
689 .build()
690 .unwrap();
691
692 assert_eq!(daemon.config().scenario, "test-scenario");
693 }
694}