Skip to main content

swarm_engine_core/learn/daemon/
mod.rs

1//! Learning Daemon - 継続的学習プロセス
2//!
3//! ## 概要
4//!
5//! Swarm セッションを跨いで動作する長寿命プロセス。
6//! イベントを受信し、データを永続化し、Trigger 発火時に学習を実行する。
7//!
8//! ## アーキテクチャ
9//!
10//! ```text
11//! EventSource → DataSink → Store
12//!                   ↓
13//!             TriggerLoop → Processor → Applier
14//! ```
15//!
16//! ## 使用例
17//!
18//! ```ignore
19//! use swarm_engine_core::learn::daemon::{LearningDaemon, DaemonConfig};
20//!
21//! let config = DaemonConfig::new("my-scenario")
22//!     .trigger(TriggerBuilder::every_n_episodes(100))
23//!     .auto_apply(true);
24//!
25//! let daemon = LearningDaemon::new(config)?;
26//! daemon.run().await?;
27//! ```
28
29mod applier;
30mod processor;
31mod sink;
32
33pub use applier::{Applier, ApplierConfig, ApplierError, ApplyMode, ApplyResult};
34pub use processor::{ProcessResult, Processor, ProcessorConfig, ProcessorError, ProcessorMode};
35pub use sink::{DataSink, DataSinkError, DataSinkStats};
36
37use std::path::PathBuf;
38use std::sync::Arc;
39use std::time::Duration;
40
41use tokio::sync::mpsc;
42use tokio::time::interval;
43
44use crate::learn::learn_model::{LearnModel, WorkerDecisionSequenceLearn};
45use crate::learn::lora::{
46    LoraTrainer, LoraTrainerConfig, ModelApplicator, NoOpApplicator, TrainedModel,
47};
48use crate::learn::record::Record;
49use crate::learn::snapshot::LearningStore;
50use crate::learn::store::{
51    EpisodeStore, FileEpisodeStore, FileRecordStore, InMemoryEpisodeStore, InMemoryRecordStore,
52    RecordStore, RecordStoreError, StoreError,
53};
54use crate::learn::trigger::{TrainTrigger, TriggerBuilder, TriggerContext};
55use crate::util::epoch_millis;
56
57// ============================================================================
58// DaemonError
59// ============================================================================
60
61/// Daemon のエラー型
62#[derive(Debug)]
63pub enum DaemonError {
64    /// DataSink エラー
65    Sink(DataSinkError),
66    /// Processor エラー
67    Processor(ProcessorError),
68    /// Applier エラー
69    Applier(ApplierError),
70    /// IO エラー
71    Io(std::io::Error),
72    /// 設定エラー
73    Config(String),
74    /// Shutdown
75    Shutdown,
76}
77
78impl std::fmt::Display for DaemonError {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        match self {
81            Self::Sink(e) => write!(f, "Sink error: {}", e),
82            Self::Processor(e) => write!(f, "Processor error: {}", e),
83            Self::Applier(e) => write!(f, "Applier error: {}", e),
84            Self::Io(e) => write!(f, "IO error: {}", e),
85            Self::Config(msg) => write!(f, "Config error: {}", msg),
86            Self::Shutdown => write!(f, "Daemon shutdown"),
87        }
88    }
89}
90
91impl std::error::Error for DaemonError {}
92
93impl From<DataSinkError> for DaemonError {
94    fn from(e: DataSinkError) -> Self {
95        Self::Sink(e)
96    }
97}
98
99impl From<ProcessorError> for DaemonError {
100    fn from(e: ProcessorError) -> Self {
101        Self::Processor(e)
102    }
103}
104
105impl From<ApplierError> for DaemonError {
106    fn from(e: ApplierError) -> Self {
107        Self::Applier(e)
108    }
109}
110
111impl From<std::io::Error> for DaemonError {
112    fn from(e: std::io::Error) -> Self {
113        Self::Io(e)
114    }
115}
116
117impl From<RecordStoreError> for DaemonError {
118    fn from(e: RecordStoreError) -> Self {
119        Self::Sink(DataSinkError::RecordStore(e))
120    }
121}
122
123impl From<StoreError> for DaemonError {
124    fn from(e: StoreError) -> Self {
125        Self::Sink(DataSinkError::EpisodeStore(e))
126    }
127}
128
129// ============================================================================
130// DaemonConfig
131// ============================================================================
132
133/// Daemon の設定
134#[derive(Debug, Clone)]
135pub struct DaemonConfig {
136    /// シナリオ名
137    pub scenario: String,
138    /// データディレクトリ
139    pub data_dir: PathBuf,
140    /// Trigger チェック間隔
141    pub check_interval: Duration,
142    /// 処理モード
143    pub processor_mode: ProcessorMode,
144    /// Offline 分析に使用するセッション数
145    pub max_sessions: usize,
146    /// 自動適用
147    pub auto_apply: bool,
148    /// LoRA 設定(None の場合は LoRA 学習しない)
149    pub lora_config: Option<LoraTrainerConfig>,
150}
151
152impl DaemonConfig {
153    /// 新しい設定を作成
154    pub fn new(scenario: impl Into<String>) -> Self {
155        Self {
156            scenario: scenario.into(),
157            data_dir: default_data_dir(),
158            check_interval: Duration::from_secs(10),
159            processor_mode: ProcessorMode::OfflineOnly,
160            max_sessions: 20,
161            auto_apply: false,
162            lora_config: None,
163        }
164    }
165
166    /// データディレクトリを設定
167    pub fn data_dir(mut self, path: impl Into<PathBuf>) -> Self {
168        self.data_dir = path.into();
169        self
170    }
171
172    /// Trigger チェック間隔を設定
173    pub fn check_interval(mut self, interval: Duration) -> Self {
174        self.check_interval = interval;
175        self
176    }
177
178    /// 処理モードを設定
179    pub fn processor_mode(mut self, mode: ProcessorMode) -> Self {
180        self.processor_mode = mode;
181        self
182    }
183
184    /// 最大セッション数を設定
185    pub fn max_sessions(mut self, n: usize) -> Self {
186        self.max_sessions = n;
187        self
188    }
189
190    /// 自動適用を設定
191    pub fn auto_apply(mut self, enabled: bool) -> Self {
192        self.auto_apply = enabled;
193        self
194    }
195
196    /// LoRA 設定を追加
197    pub fn with_lora(mut self, config: LoraTrainerConfig) -> Self {
198        self.lora_config = Some(config);
199        self
200    }
201}
202
203fn default_data_dir() -> PathBuf {
204    dirs::data_dir()
205        .unwrap_or_else(|| PathBuf::from("."))
206        .join("swarm-engine")
207        .join("learning")
208}
209
210// ============================================================================
211// DaemonStats
212// ============================================================================
213
214/// Daemon の統計情報
215#[derive(Debug, Clone, Default)]
216pub struct DaemonStats {
217    /// 受信した Record 数
218    pub records_received: usize,
219    /// 生成した Episode 数
220    pub episodes_created: usize,
221    /// 実行した学習回数
222    pub trainings_completed: usize,
223    /// 適用したモデル数
224    pub models_applied: usize,
225    /// 最終学習時刻(Unix timestamp ms)
226    pub last_train_at: Option<u64>,
227    /// 起動時刻(Unix timestamp ms)
228    pub started_at: u64,
229}
230
231// ============================================================================
232// LearningDaemon
233// ============================================================================
234
235/// 継続的学習プロセス
236pub struct LearningDaemon {
237    /// 設定
238    config: DaemonConfig,
239    /// DataSink
240    sink: DataSink,
241    /// Trigger
242    trigger: Arc<dyn TrainTrigger>,
243    /// Processor
244    processor: Processor,
245    /// Applier(オプション)
246    applier: Option<Applier>,
247    /// 統計情報
248    stats: DaemonStats,
249    /// 最終学習時の Episode 数
250    last_train_count: usize,
251    /// Record 受信チャンネル
252    record_rx: mpsc::Receiver<Vec<Record>>,
253    /// Record 送信チャンネル(外部公開用)
254    record_tx: mpsc::Sender<Vec<Record>>,
255    /// Shutdown チャンネル
256    shutdown_rx: mpsc::Receiver<()>,
257    /// Shutdown 送信チャンネル(外部公開用)
258    shutdown_tx: mpsc::Sender<()>,
259}
260
261impl LearningDaemon {
262    /// 新しい Daemon を作成(In-Memory Store)
263    pub fn new(config: DaemonConfig, trigger: Arc<dyn TrainTrigger>) -> Result<Self, DaemonError> {
264        let record_store: Arc<dyn RecordStore> = Arc::new(InMemoryRecordStore::new());
265        let episode_store: Arc<dyn EpisodeStore> = Arc::new(InMemoryEpisodeStore::new());
266        let learn_model: Arc<dyn LearnModel> = Arc::new(WorkerDecisionSequenceLearn::new());
267
268        Self::with_stores(config, trigger, record_store, episode_store, learn_model)
269    }
270
271    /// 新しい Daemon を作成(File Store)
272    pub fn with_file_stores(
273        config: DaemonConfig,
274        trigger: Arc<dyn TrainTrigger>,
275    ) -> Result<Self, DaemonError> {
276        // ディレクトリ作成
277        std::fs::create_dir_all(&config.data_dir)?;
278
279        let record_store: Arc<dyn RecordStore> =
280            Arc::new(FileRecordStore::new(config.data_dir.join("records"))?);
281        let episode_store: Arc<dyn EpisodeStore> =
282            Arc::new(FileEpisodeStore::new(config.data_dir.join("episodes"))?);
283        let learn_model: Arc<dyn LearnModel> = Arc::new(WorkerDecisionSequenceLearn::new());
284
285        Self::with_stores(config, trigger, record_store, episode_store, learn_model)
286    }
287
288    /// カスタム Store で Daemon を作成
289    pub fn with_stores(
290        config: DaemonConfig,
291        trigger: Arc<dyn TrainTrigger>,
292        record_store: Arc<dyn RecordStore>,
293        episode_store: Arc<dyn EpisodeStore>,
294        learn_model: Arc<dyn LearnModel>,
295    ) -> Result<Self, DaemonError> {
296        // DataSink
297        let sink = DataSink::new(
298            record_store,
299            Arc::clone(&episode_store),
300            Arc::clone(&learn_model),
301        );
302
303        // Processor
304        let processor_config = ProcessorConfig::new(&config.scenario)
305            .mode(config.processor_mode)
306            .max_sessions(config.max_sessions);
307
308        let mut processor = Processor::new(processor_config);
309
310        // LearningStore for offline analysis
311        let learning_store = LearningStore::new(&config.data_dir)?;
312        processor = processor.with_learning_store(learning_store);
313
314        // LoRA Trainer (if configured)
315        if let Some(lora_config) = &config.lora_config {
316            let trainer = LoraTrainer::new(lora_config.clone(), episode_store);
317            processor = processor
318                .with_lora_trainer(trainer)
319                .with_learn_model(learn_model);
320        }
321
322        // Applier
323        let applier = if config.auto_apply {
324            let applier_config = ApplierConfig::default().auto_apply();
325            // TODO: Real applicator based on config
326            let applicator: Arc<dyn ModelApplicator> = Arc::new(NoOpApplicator::new());
327            Some(Applier::new(applier_config, applicator))
328        } else {
329            None
330        };
331
332        // Channels
333        let (record_tx, record_rx) = mpsc::channel(1000);
334        let (shutdown_tx, shutdown_rx) = mpsc::channel(1);
335
336        Ok(Self {
337            config,
338            sink,
339            trigger,
340            processor,
341            applier,
342            stats: DaemonStats {
343                started_at: epoch_millis(),
344                ..Default::default()
345            },
346            last_train_count: 0,
347            record_rx,
348            record_tx,
349            shutdown_rx,
350            shutdown_tx,
351        })
352    }
353
354    /// Record 送信チャンネルを取得
355    pub fn record_sender(&self) -> mpsc::Sender<Vec<Record>> {
356        self.record_tx.clone()
357    }
358
359    /// Shutdown 送信チャンネルを取得
360    pub fn shutdown_sender(&self) -> mpsc::Sender<()> {
361        self.shutdown_tx.clone()
362    }
363
364    /// 設定を取得
365    pub fn config(&self) -> &DaemonConfig {
366        &self.config
367    }
368
369    /// 統計情報を取得
370    pub fn stats(&self) -> &DaemonStats {
371        &self.stats
372    }
373
374    /// メインループを実行
375    pub async fn run(&mut self) -> Result<(), DaemonError> {
376        tracing::info!(
377            scenario = %self.config.scenario,
378            data_dir = %self.config.data_dir.display(),
379            trigger = self.trigger.name(),
380            "Learning daemon started"
381        );
382
383        let mut check_interval = interval(self.config.check_interval);
384
385        loop {
386            tokio::select! {
387                // Shutdown signal
388                _ = self.shutdown_rx.recv() => {
389                    tracing::info!("Shutdown signal received");
390                    return Ok(());
391                }
392
393                // Record batch received
394                Some(records) = self.record_rx.recv() => {
395                    self.handle_records(records).await?;
396                }
397
398                // Periodic trigger check
399                _ = check_interval.tick() => {
400                    self.check_and_train().await?;
401                }
402            }
403        }
404    }
405
406    /// Record を処理
407    async fn handle_records(&mut self, records: Vec<Record>) -> Result<(), DaemonError> {
408        if records.is_empty() {
409            return Ok(());
410        }
411
412        let count = records.len();
413        let episode_ids = self.sink.ingest(records)?;
414
415        self.stats.records_received += count;
416        self.stats.episodes_created += episode_ids.len();
417
418        tracing::debug!(
419            records = count,
420            episodes = episode_ids.len(),
421            "Processed records"
422        );
423
424        Ok(())
425    }
426
427    /// Trigger をチェックし、必要なら学習を実行
428    async fn check_and_train(&mut self) -> Result<(), DaemonError> {
429        // Trigger 判定用コンテキスト
430        let current_count = self.sink.episode_count();
431        let ctx = TriggerContext::with_count(current_count)
432            .last_train_at(self.stats.last_train_at.unwrap_or(0))
433            .last_train_count(self.last_train_count);
434
435        if !self.trigger.should_train(&ctx).unwrap_or(false) {
436            return Ok(());
437        }
438
439        tracing::info!(
440            episode_count = current_count,
441            trigger = self.trigger.name(),
442            "Trigger fired, starting learning"
443        );
444
445        // 学習実行
446        let result = self
447            .processor
448            .run(self.sink.episode_store().as_ref())
449            .await?;
450
451        // 統計更新
452        self.stats.trainings_completed += 1;
453        self.stats.last_train_at = Some(epoch_millis());
454        self.last_train_count = current_count;
455
456        // 自動適用
457        if let Some(applier) = &mut self.applier {
458            if let Some(model) = result.lora_model() {
459                let apply_result = applier.apply(model).await?;
460                if apply_result.is_applied() {
461                    self.stats.models_applied += 1;
462                }
463            }
464        }
465
466        tracing::info!(
467            trainings = self.stats.trainings_completed,
468            models_applied = self.stats.models_applied,
469            "Learning cycle completed"
470        );
471
472        Ok(())
473    }
474
475    /// 手動で学習を実行(Trigger 無視)
476    pub async fn train_now(&mut self) -> Result<ProcessResult, DaemonError> {
477        tracing::info!("Manual training triggered");
478
479        let result = self
480            .processor
481            .run(self.sink.episode_store().as_ref())
482            .await?;
483
484        self.stats.trainings_completed += 1;
485        self.stats.last_train_at = Some(epoch_millis());
486        self.last_train_count = self.sink.episode_count();
487
488        Ok(result)
489    }
490
491    /// 手動でモデルを適用(非同期)
492    pub async fn apply_model(&mut self, model: &TrainedModel) -> Result<ApplyResult, DaemonError> {
493        let applier = self
494            .applier
495            .as_mut()
496            .ok_or_else(|| DaemonError::Config("Applier not configured".into()))?;
497
498        let result = applier.apply_now(model).await?;
499        if result.is_applied() {
500            self.stats.models_applied += 1;
501        }
502
503        Ok(result)
504    }
505}
506
507// ============================================================================
508// Builder Pattern
509// ============================================================================
510
511/// LearningDaemon のビルダー
512pub struct DaemonBuilder {
513    config: DaemonConfig,
514    trigger: Option<Arc<dyn TrainTrigger>>,
515    record_store: Option<Arc<dyn RecordStore>>,
516    episode_store: Option<Arc<dyn EpisodeStore>>,
517    learn_model: Option<Arc<dyn LearnModel>>,
518    applicator: Option<Arc<dyn ModelApplicator>>,
519}
520
521impl DaemonBuilder {
522    /// 新しいビルダーを作成
523    pub fn new(scenario: impl Into<String>) -> Self {
524        Self {
525            config: DaemonConfig::new(scenario),
526            trigger: None,
527            record_store: None,
528            episode_store: None,
529            learn_model: None,
530            applicator: None,
531        }
532    }
533
534    /// データディレクトリを設定
535    pub fn data_dir(mut self, path: impl Into<PathBuf>) -> Self {
536        self.config.data_dir = path.into();
537        self
538    }
539
540    /// Trigger を設定
541    pub fn trigger(mut self, trigger: Arc<dyn TrainTrigger>) -> Self {
542        self.trigger = Some(trigger);
543        self
544    }
545
546    /// 処理モードを設定
547    pub fn processor_mode(mut self, mode: ProcessorMode) -> Self {
548        self.config.processor_mode = mode;
549        self
550    }
551
552    /// 自動適用を有効化
553    pub fn auto_apply(mut self) -> Self {
554        self.config.auto_apply = true;
555        self
556    }
557
558    /// RecordStore を設定
559    pub fn record_store(mut self, store: Arc<dyn RecordStore>) -> Self {
560        self.record_store = Some(store);
561        self
562    }
563
564    /// EpisodeStore を設定
565    pub fn episode_store(mut self, store: Arc<dyn EpisodeStore>) -> Self {
566        self.episode_store = Some(store);
567        self
568    }
569
570    /// LearnModel を設定
571    pub fn learn_model(mut self, model: Arc<dyn LearnModel>) -> Self {
572        self.learn_model = Some(model);
573        self
574    }
575
576    /// ModelApplicator を設定
577    pub fn applicator(mut self, applicator: Arc<dyn ModelApplicator>) -> Self {
578        self.applicator = Some(applicator);
579        self
580    }
581
582    /// LoRA 設定を追加
583    pub fn with_lora(mut self, config: LoraTrainerConfig) -> Self {
584        self.config.lora_config = Some(config);
585        self
586    }
587
588    /// Daemon をビルド
589    pub fn build(self) -> Result<LearningDaemon, DaemonError> {
590        let trigger = self
591            .trigger
592            .unwrap_or_else(|| TriggerBuilder::default_watch());
593
594        let record_store = self
595            .record_store
596            .unwrap_or_else(|| Arc::new(InMemoryRecordStore::new()));
597
598        let episode_store = self
599            .episode_store
600            .unwrap_or_else(|| Arc::new(InMemoryEpisodeStore::new()));
601
602        let learn_model = self
603            .learn_model
604            .unwrap_or_else(|| Arc::new(WorkerDecisionSequenceLearn::new()));
605
606        LearningDaemon::with_stores(
607            self.config,
608            trigger,
609            record_store,
610            episode_store,
611            learn_model,
612        )
613    }
614}
615
616// ============================================================================
617// Tests
618// ============================================================================
619
620#[cfg(test)]
621mod tests {
622    use super::*;
623    use crate::events::{ActionContext, ActionEventBuilder, ActionEventResult};
624    use crate::learn::trigger::AlwaysTrigger;
625    use crate::types::WorkerId;
626
627    fn make_test_records(count: usize) -> Vec<Record> {
628        (0..count)
629            .map(|i| {
630                let event = ActionEventBuilder::new(i as u64, WorkerId(0), &format!("Action{}", i))
631                    .result(ActionEventResult::success())
632                    .duration(std::time::Duration::from_millis(10))
633                    .context(ActionContext::new())
634                    .build();
635                Record::from(&event)
636            })
637            .collect()
638    }
639
640    #[test]
641    fn test_daemon_config_builder() {
642        let config = DaemonConfig::new("test")
643            .data_dir("/tmp/test")
644            .check_interval(Duration::from_secs(30))
645            .processor_mode(ProcessorMode::Full)
646            .auto_apply(true);
647
648        assert_eq!(config.scenario, "test");
649        assert_eq!(config.data_dir, PathBuf::from("/tmp/test"));
650        assert_eq!(config.check_interval, Duration::from_secs(30));
651        assert_eq!(config.processor_mode, ProcessorMode::Full);
652        assert!(config.auto_apply);
653    }
654
655    #[tokio::test]
656    async fn test_daemon_creation() {
657        let config = DaemonConfig::new("test");
658        let trigger = TriggerBuilder::never();
659
660        let daemon = LearningDaemon::new(config, trigger).unwrap();
661        assert_eq!(daemon.config().scenario, "test");
662        assert_eq!(daemon.stats().records_received, 0);
663    }
664
665    #[tokio::test]
666    async fn test_daemon_record_ingestion() {
667        let config = DaemonConfig::new("test");
668        let trigger = TriggerBuilder::never(); // Never auto-train
669
670        let mut daemon = LearningDaemon::new(config, trigger).unwrap();
671        let sender = daemon.record_sender();
672
673        // Send records
674        let records = make_test_records(5);
675        sender.send(records).await.unwrap();
676
677        // Process one batch
678        daemon.handle_records(make_test_records(3)).await.unwrap();
679
680        assert_eq!(daemon.stats().records_received, 3);
681    }
682
683    #[tokio::test]
684    async fn test_daemon_builder() {
685        let daemon = DaemonBuilder::new("test-scenario")
686            .data_dir("/tmp/test")
687            .trigger(Arc::new(AlwaysTrigger))
688            .processor_mode(ProcessorMode::OfflineOnly)
689            .build()
690            .unwrap();
691
692        assert_eq!(daemon.config().scenario, "test-scenario");
693    }
694}