Skip to main content

swarm_engine_core/learn/daemon/
mod.rs

1//! Learning Daemon - 継続的学習プロセス
2//!
3//! ## 概要
4//!
5//! Swarm セッションを跨いで動作する長寿命プロセス。
6//! イベントを受信し、データを永続化し、Trigger 発火時に学習を実行する。
7//!
8//! ## アーキテクチャ
9//!
10//! ```text
11//! EventSource → DataSink → Store
12//!                   ↓
13//!             TriggerLoop → Processor → Applier
14//! ```
15//!
16//! ## 使用例
17//!
18//! ```ignore
19//! use swarm_engine_core::learn::daemon::{LearningDaemon, DaemonConfig};
20//!
21//! let config = DaemonConfig::new("my-scenario")
22//!     .trigger(TriggerBuilder::every_n_episodes(100))
23//!     .auto_apply(true);
24//!
25//! let daemon = LearningDaemon::new(config)?;
26//! daemon.run().await?;
27//! ```
28
29mod applier;
30mod processor;
31mod sink;
32mod subscriber;
33
34pub use applier::{Applier, ApplierConfig, ApplierError, ApplyMode, ApplyResult};
35pub use processor::{ProcessResult, Processor, ProcessorConfig, ProcessorError, ProcessorMode};
36pub use sink::{DataSink, DataSinkError, DataSinkStats};
37pub use subscriber::{ActionEventSubscriber, EventSubscriberConfig, LearningEventSubscriber};
38
39use std::path::PathBuf;
40use std::sync::Arc;
41use std::time::Duration;
42
43use tokio::sync::mpsc;
44use tokio::time::interval;
45
46use crate::learn::learn_model::{LearnModel, WorkerDecisionSequenceLearn};
47use crate::learn::lora::{
48    LoraTrainer, LoraTrainerConfig, ModelApplicator, NoOpApplicator, TrainedModel,
49};
50use crate::learn::record::Record;
51use crate::learn::snapshot::LearningStore;
52use crate::learn::store::{
53    EpisodeStore, FileEpisodeStore, FileRecordStore, InMemoryEpisodeStore, InMemoryRecordStore,
54    RecordStore, RecordStoreError, StoreError,
55};
56use crate::learn::trigger::{TrainTrigger, TriggerBuilder, TriggerContext};
57use crate::util::epoch_millis;
58
59// ============================================================================
60// DaemonError
61// ============================================================================
62
63/// Daemon のエラー型
64#[derive(Debug)]
65pub enum DaemonError {
66    /// DataSink エラー
67    Sink(DataSinkError),
68    /// Processor エラー
69    Processor(ProcessorError),
70    /// Applier エラー
71    Applier(ApplierError),
72    /// IO エラー
73    Io(std::io::Error),
74    /// 設定エラー
75    Config(String),
76    /// Shutdown
77    Shutdown,
78}
79
80impl std::fmt::Display for DaemonError {
81    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
82        match self {
83            Self::Sink(e) => write!(f, "Sink error: {}", e),
84            Self::Processor(e) => write!(f, "Processor error: {}", e),
85            Self::Applier(e) => write!(f, "Applier error: {}", e),
86            Self::Io(e) => write!(f, "IO error: {}", e),
87            Self::Config(msg) => write!(f, "Config error: {}", msg),
88            Self::Shutdown => write!(f, "Daemon shutdown"),
89        }
90    }
91}
92
93impl std::error::Error for DaemonError {}
94
95impl From<DataSinkError> for DaemonError {
96    fn from(e: DataSinkError) -> Self {
97        Self::Sink(e)
98    }
99}
100
101impl From<ProcessorError> for DaemonError {
102    fn from(e: ProcessorError) -> Self {
103        Self::Processor(e)
104    }
105}
106
107impl From<ApplierError> for DaemonError {
108    fn from(e: ApplierError) -> Self {
109        Self::Applier(e)
110    }
111}
112
113impl From<std::io::Error> for DaemonError {
114    fn from(e: std::io::Error) -> Self {
115        Self::Io(e)
116    }
117}
118
119impl From<RecordStoreError> for DaemonError {
120    fn from(e: RecordStoreError) -> Self {
121        Self::Sink(DataSinkError::RecordStore(e))
122    }
123}
124
125impl From<StoreError> for DaemonError {
126    fn from(e: StoreError) -> Self {
127        Self::Sink(DataSinkError::EpisodeStore(e))
128    }
129}
130
131// ============================================================================
132// DaemonConfig
133// ============================================================================
134
135/// Daemon の設定
136#[derive(Debug, Clone)]
137pub struct DaemonConfig {
138    /// シナリオ名
139    pub scenario: String,
140    /// データディレクトリ
141    pub data_dir: PathBuf,
142    /// Trigger チェック間隔
143    pub check_interval: Duration,
144    /// 処理モード
145    pub processor_mode: ProcessorMode,
146    /// Offline 分析に使用するセッション数
147    pub max_sessions: usize,
148    /// 自動適用
149    pub auto_apply: bool,
150    /// LoRA 設定(None の場合は LoRA 学習しない)
151    pub lora_config: Option<LoraTrainerConfig>,
152}
153
154impl DaemonConfig {
155    /// 新しい設定を作成
156    pub fn new(scenario: impl Into<String>) -> Self {
157        Self {
158            scenario: scenario.into(),
159            data_dir: default_data_dir(),
160            check_interval: Duration::from_secs(10),
161            processor_mode: ProcessorMode::OfflineOnly,
162            max_sessions: 20,
163            auto_apply: false,
164            lora_config: None,
165        }
166    }
167
168    /// データディレクトリを設定
169    pub fn data_dir(mut self, path: impl Into<PathBuf>) -> Self {
170        self.data_dir = path.into();
171        self
172    }
173
174    /// Trigger チェック間隔を設定
175    pub fn check_interval(mut self, interval: Duration) -> Self {
176        self.check_interval = interval;
177        self
178    }
179
180    /// 処理モードを設定
181    pub fn processor_mode(mut self, mode: ProcessorMode) -> Self {
182        self.processor_mode = mode;
183        self
184    }
185
186    /// 最大セッション数を設定
187    pub fn max_sessions(mut self, n: usize) -> Self {
188        self.max_sessions = n;
189        self
190    }
191
192    /// 自動適用を設定
193    pub fn auto_apply(mut self, enabled: bool) -> Self {
194        self.auto_apply = enabled;
195        self
196    }
197
198    /// LoRA 設定を追加
199    pub fn with_lora(mut self, config: LoraTrainerConfig) -> Self {
200        self.lora_config = Some(config);
201        self
202    }
203}
204
205fn default_data_dir() -> PathBuf {
206    dirs::data_dir()
207        .unwrap_or_else(|| PathBuf::from("."))
208        .join("swarm-engine")
209        .join("learning")
210}
211
212// ============================================================================
213// DaemonStats
214// ============================================================================
215
216/// Daemon の統計情報
217#[derive(Debug, Clone, Default)]
218pub struct DaemonStats {
219    /// 受信した Record 数
220    pub records_received: usize,
221    /// 生成した Episode 数
222    pub episodes_created: usize,
223    /// 実行した学習回数
224    pub trainings_completed: usize,
225    /// 適用したモデル数
226    pub models_applied: usize,
227    /// 最終学習時刻(Unix timestamp ms)
228    pub last_train_at: Option<u64>,
229    /// 起動時刻(Unix timestamp ms)
230    pub started_at: u64,
231}
232
233// ============================================================================
234// LearningDaemon
235// ============================================================================
236
237/// 継続的学習プロセス
238pub struct LearningDaemon {
239    /// 設定
240    config: DaemonConfig,
241    /// DataSink
242    sink: DataSink,
243    /// Trigger
244    trigger: Arc<dyn TrainTrigger>,
245    /// Processor
246    processor: Processor,
247    /// Applier(オプション)
248    applier: Option<Applier>,
249    /// 統計情報
250    stats: DaemonStats,
251    /// 最終学習時の Episode 数
252    last_train_count: usize,
253    /// Record 受信チャンネル
254    record_rx: mpsc::Receiver<Vec<Record>>,
255    /// Record 送信チャンネル(外部公開用)
256    record_tx: mpsc::Sender<Vec<Record>>,
257    /// Shutdown チャンネル
258    shutdown_rx: mpsc::Receiver<()>,
259    /// Shutdown 送信チャンネル(外部公開用)
260    shutdown_tx: mpsc::Sender<()>,
261}
262
263impl LearningDaemon {
264    /// 新しい Daemon を作成(In-Memory Store)
265    pub fn new(config: DaemonConfig, trigger: Arc<dyn TrainTrigger>) -> Result<Self, DaemonError> {
266        let record_store: Arc<dyn RecordStore> = Arc::new(InMemoryRecordStore::new());
267        let episode_store: Arc<dyn EpisodeStore> = Arc::new(InMemoryEpisodeStore::new());
268        let learn_model: Arc<dyn LearnModel> = Arc::new(WorkerDecisionSequenceLearn::new());
269
270        Self::with_stores(config, trigger, record_store, episode_store, learn_model)
271    }
272
273    /// 新しい Daemon を作成(File Store)
274    pub fn with_file_stores(
275        config: DaemonConfig,
276        trigger: Arc<dyn TrainTrigger>,
277    ) -> Result<Self, DaemonError> {
278        // ディレクトリ作成
279        std::fs::create_dir_all(&config.data_dir)?;
280
281        let record_store: Arc<dyn RecordStore> =
282            Arc::new(FileRecordStore::new(config.data_dir.join("records"))?);
283        let episode_store: Arc<dyn EpisodeStore> =
284            Arc::new(FileEpisodeStore::new(config.data_dir.join("episodes"))?);
285        let learn_model: Arc<dyn LearnModel> = Arc::new(WorkerDecisionSequenceLearn::new());
286
287        Self::with_stores(config, trigger, record_store, episode_store, learn_model)
288    }
289
290    /// カスタム Store で Daemon を作成
291    pub fn with_stores(
292        config: DaemonConfig,
293        trigger: Arc<dyn TrainTrigger>,
294        record_store: Arc<dyn RecordStore>,
295        episode_store: Arc<dyn EpisodeStore>,
296        learn_model: Arc<dyn LearnModel>,
297    ) -> Result<Self, DaemonError> {
298        // DataSink
299        let sink = DataSink::new(
300            record_store,
301            Arc::clone(&episode_store),
302            Arc::clone(&learn_model),
303        );
304
305        // Processor
306        let processor_config = ProcessorConfig::new(&config.scenario)
307            .mode(config.processor_mode)
308            .max_sessions(config.max_sessions);
309
310        let mut processor = Processor::new(processor_config);
311
312        // LearningStore for offline analysis
313        let learning_store = LearningStore::new(&config.data_dir)?;
314        processor = processor.with_learning_store(learning_store);
315
316        // LoRA Trainer (if configured)
317        if let Some(lora_config) = &config.lora_config {
318            let trainer = LoraTrainer::new(lora_config.clone(), episode_store);
319            processor = processor
320                .with_lora_trainer(trainer)
321                .with_learn_model(learn_model);
322        }
323
324        // Applier
325        let applier = if config.auto_apply {
326            let applier_config = ApplierConfig::default().auto_apply();
327            // TODO: Real applicator based on config
328            let applicator: Arc<dyn ModelApplicator> = Arc::new(NoOpApplicator::new());
329            Some(Applier::new(applier_config, applicator))
330        } else {
331            None
332        };
333
334        // Channels
335        let (record_tx, record_rx) = mpsc::channel(1000);
336        let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
337
338        Ok(Self {
339            config,
340            sink,
341            trigger,
342            processor,
343            applier,
344            stats: DaemonStats {
345                started_at: epoch_millis(),
346                ..Default::default()
347            },
348            last_train_count: 0,
349            record_rx,
350            record_tx,
351            shutdown_rx,
352            shutdown_tx,
353        })
354    }
355
356    /// Record 送信チャンネルを取得
357    pub fn record_sender(&self) -> mpsc::Sender<Vec<Record>> {
358        self.record_tx.clone()
359    }
360
361    /// Shutdown 送信チャンネルを取得
362    pub fn shutdown_sender(&self) -> mpsc::Sender<()> {
363        self.shutdown_tx.clone()
364    }
365
366    /// 設定を取得
367    pub fn config(&self) -> &DaemonConfig {
368        &self.config
369    }
370
371    /// 統計情報を取得
372    pub fn stats(&self) -> &DaemonStats {
373        &self.stats
374    }
375
376    /// メインループを実行
377    pub async fn run(&mut self) -> Result<(), DaemonError> {
378        tracing::info!(
379            scenario = %self.config.scenario,
380            data_dir = %self.config.data_dir.display(),
381            trigger = self.trigger.name(),
382            "Learning daemon started"
383        );
384
385        let mut check_interval = interval(self.config.check_interval);
386
387        loop {
388            tokio::select! {
389                // Shutdown signal
390                _ = self.shutdown_rx.recv() => {
391                    tracing::info!("Shutdown signal received");
392                    return Ok(());
393                }
394
395                // Record batch received
396                Some(records) = self.record_rx.recv() => {
397                    self.handle_records(records).await?;
398                }
399
400                // Periodic trigger check
401                _ = check_interval.tick() => {
402                    self.check_and_train().await?;
403                }
404            }
405        }
406    }
407
408    /// Record を処理
409    async fn handle_records(&mut self, records: Vec<Record>) -> Result<(), DaemonError> {
410        if records.is_empty() {
411            return Ok(());
412        }
413
414        let count = records.len();
415        let episode_ids = self.sink.ingest(records)?;
416
417        self.stats.records_received += count;
418        self.stats.episodes_created += episode_ids.len();
419
420        tracing::debug!(
421            records = count,
422            episodes = episode_ids.len(),
423            "Processed records"
424        );
425
426        Ok(())
427    }
428
429    /// Trigger をチェックし、必要なら学習を実行
430    async fn check_and_train(&mut self) -> Result<(), DaemonError> {
431        // Trigger 判定用コンテキスト
432        let current_count = self.sink.episode_count();
433        let ctx = TriggerContext::with_count(current_count)
434            .last_train_at(self.stats.last_train_at.unwrap_or(0))
435            .last_train_count(self.last_train_count);
436
437        if !self.trigger.should_train(&ctx).unwrap_or(false) {
438            return Ok(());
439        }
440
441        tracing::info!(
442            episode_count = current_count,
443            trigger = self.trigger.name(),
444            "Trigger fired, starting learning"
445        );
446
447        // 学習実行
448        let result = self
449            .processor
450            .run(self.sink.episode_store().as_ref())
451            .await?;
452
453        // 統計更新
454        self.stats.trainings_completed += 1;
455        self.stats.last_train_at = Some(epoch_millis());
456        self.last_train_count = current_count;
457
458        // 自動適用
459        if let Some(applier) = &mut self.applier {
460            if let Some(model) = result.lora_model() {
461                let apply_result = applier.apply(model).await?;
462                if apply_result.is_applied() {
463                    self.stats.models_applied += 1;
464                }
465            }
466        }
467
468        tracing::info!(
469            trainings = self.stats.trainings_completed,
470            models_applied = self.stats.models_applied,
471            "Learning cycle completed"
472        );
473
474        Ok(())
475    }
476
477    /// 手動で学習を実行(Trigger 無視)
478    pub async fn train_now(&mut self) -> Result<ProcessResult, DaemonError> {
479        tracing::info!("Manual training triggered");
480
481        let result = self
482            .processor
483            .run(self.sink.episode_store().as_ref())
484            .await?;
485
486        self.stats.trainings_completed += 1;
487        self.stats.last_train_at = Some(epoch_millis());
488        self.last_train_count = self.sink.episode_count();
489
490        Ok(result)
491    }
492
493    /// 手動でモデルを適用(非同期)
494    pub async fn apply_model(&mut self, model: &TrainedModel) -> Result<ApplyResult, DaemonError> {
495        let applier = self
496            .applier
497            .as_mut()
498            .ok_or_else(|| DaemonError::Config("Applier not configured".into()))?;
499
500        let result = applier.apply_now(model).await?;
501        if result.is_applied() {
502            self.stats.models_applied += 1;
503        }
504
505        Ok(result)
506    }
507}
508
509// ============================================================================
510// Builder Pattern
511// ============================================================================
512
513/// LearningDaemon のビルダー
514pub struct DaemonBuilder {
515    config: DaemonConfig,
516    trigger: Option<Arc<dyn TrainTrigger>>,
517    record_store: Option<Arc<dyn RecordStore>>,
518    episode_store: Option<Arc<dyn EpisodeStore>>,
519    learn_model: Option<Arc<dyn LearnModel>>,
520    applicator: Option<Arc<dyn ModelApplicator>>,
521}
522
523impl DaemonBuilder {
524    /// 新しいビルダーを作成
525    pub fn new(scenario: impl Into<String>) -> Self {
526        Self {
527            config: DaemonConfig::new(scenario),
528            trigger: None,
529            record_store: None,
530            episode_store: None,
531            learn_model: None,
532            applicator: None,
533        }
534    }
535
536    /// データディレクトリを設定
537    pub fn data_dir(mut self, path: impl Into<PathBuf>) -> Self {
538        self.config.data_dir = path.into();
539        self
540    }
541
542    /// Trigger を設定
543    pub fn trigger(mut self, trigger: Arc<dyn TrainTrigger>) -> Self {
544        self.trigger = Some(trigger);
545        self
546    }
547
548    /// 処理モードを設定
549    pub fn processor_mode(mut self, mode: ProcessorMode) -> Self {
550        self.config.processor_mode = mode;
551        self
552    }
553
554    /// 自動適用を有効化
555    pub fn auto_apply(mut self) -> Self {
556        self.config.auto_apply = true;
557        self
558    }
559
560    /// RecordStore を設定
561    pub fn record_store(mut self, store: Arc<dyn RecordStore>) -> Self {
562        self.record_store = Some(store);
563        self
564    }
565
566    /// EpisodeStore を設定
567    pub fn episode_store(mut self, store: Arc<dyn EpisodeStore>) -> Self {
568        self.episode_store = Some(store);
569        self
570    }
571
572    /// LearnModel を設定
573    pub fn learn_model(mut self, model: Arc<dyn LearnModel>) -> Self {
574        self.learn_model = Some(model);
575        self
576    }
577
578    /// ModelApplicator を設定
579    pub fn applicator(mut self, applicator: Arc<dyn ModelApplicator>) -> Self {
580        self.applicator = Some(applicator);
581        self
582    }
583
584    /// LoRA 設定を追加
585    pub fn with_lora(mut self, config: LoraTrainerConfig) -> Self {
586        self.config.lora_config = Some(config);
587        self
588    }
589
590    /// Daemon をビルド
591    pub fn build(self) -> Result<LearningDaemon, DaemonError> {
592        let trigger = self
593            .trigger
594            .unwrap_or_else(|| TriggerBuilder::default_watch());
595
596        let record_store = self
597            .record_store
598            .unwrap_or_else(|| Arc::new(InMemoryRecordStore::new()));
599
600        let episode_store = self
601            .episode_store
602            .unwrap_or_else(|| Arc::new(InMemoryEpisodeStore::new()));
603
604        let learn_model = self
605            .learn_model
606            .unwrap_or_else(|| Arc::new(WorkerDecisionSequenceLearn::new()));
607
608        LearningDaemon::with_stores(
609            self.config,
610            trigger,
611            record_store,
612            episode_store,
613            learn_model,
614        )
615    }
616}
617
618// ============================================================================
619// Tests
620// ============================================================================
621
622#[cfg(test)]
623mod tests {
624    use super::*;
625    use crate::events::{ActionContext, ActionEventBuilder, ActionEventResult};
626    use crate::learn::trigger::AlwaysTrigger;
627    use crate::types::WorkerId;
628
629    fn make_test_records(count: usize) -> Vec<Record> {
630        (0..count)
631            .map(|i| {
632                let event = ActionEventBuilder::new(i as u64, WorkerId(0), &format!("Action{}", i))
633                    .result(ActionEventResult::success())
634                    .duration(std::time::Duration::from_millis(10))
635                    .context(ActionContext::new())
636                    .build();
637                Record::from(&event)
638            })
639            .collect()
640    }
641
642    #[test]
643    fn test_daemon_config_builder() {
644        let config = DaemonConfig::new("test")
645            .data_dir("/tmp/test")
646            .check_interval(Duration::from_secs(30))
647            .processor_mode(ProcessorMode::Full)
648            .auto_apply(true);
649
650        assert_eq!(config.scenario, "test");
651        assert_eq!(config.data_dir, PathBuf::from("/tmp/test"));
652        assert_eq!(config.check_interval, Duration::from_secs(30));
653        assert_eq!(config.processor_mode, ProcessorMode::Full);
654        assert!(config.auto_apply);
655    }
656
657    #[tokio::test]
658    async fn test_daemon_creation() {
659        let config = DaemonConfig::new("test");
660        let trigger = TriggerBuilder::never();
661
662        let daemon = LearningDaemon::new(config, trigger).unwrap();
663        assert_eq!(daemon.config().scenario, "test");
664        assert_eq!(daemon.stats().records_received, 0);
665    }
666
667    #[tokio::test]
668    async fn test_daemon_record_ingestion() {
669        let config = DaemonConfig::new("test");
670        let trigger = TriggerBuilder::never(); // Never auto-train
671
672        let mut daemon = LearningDaemon::new(config, trigger).unwrap();
673        let sender = daemon.record_sender();
674
675        // Send records
676        let records = make_test_records(5);
677        sender.send(records).await.unwrap();
678
679        // Process one batch
680        daemon.handle_records(make_test_records(3)).await.unwrap();
681
682        assert_eq!(daemon.stats().records_received, 3);
683    }
684
685    #[tokio::test]
686    async fn test_daemon_builder() {
687        let daemon = DaemonBuilder::new("test-scenario")
688            .data_dir("/tmp/test")
689            .trigger(Arc::new(AlwaysTrigger))
690            .processor_mode(ProcessorMode::OfflineOnly)
691            .build()
692            .unwrap();
693
694        assert_eq!(daemon.config().scenario, "test-scenario");
695    }
696}