Skip to main content

lab_resource_manager/application/usecases/
notify_future_resource_usage_changes.rs

1use crate::application::ApplicationError;
2use crate::domain::aggregates::resource_usage::entity::ResourceUsage;
3use crate::domain::ports::repositories::ResourceUsageRepository;
4use crate::domain::ports::{NotificationEvent, Notifier};
5use std::collections::HashMap;
6use std::sync::Arc;
7
8/// 未来および進行中のリソース使用状況の変更を監視し、通知するユースケース
9///
10/// このユースケースは以下の変更を検知して通知します:
11/// - 新規作成: 新しいリソース使用予約が追加された
12/// - 更新: 既存の予約内容が変更された
13/// - 削除: **未来の予約**がキャンセル/削除された
14///
15/// # スコープ
16/// このユースケースは「未来および進行中」のリソース使用のみを監視対象とします。
17/// 予約期間が終了したリソースは自然に監視対象外となり、削除通知は送信されません。
18pub struct NotifyFutureResourceUsageChangesUseCase<R, N>
19where
20    R: ResourceUsageRepository,
21    N: Notifier,
22{
23    repository: Arc<R>,
24    notifier: N,
25    previous_state: tokio::sync::Mutex<HashMap<String, ResourceUsage>>,
26}
27
28impl<R, N> NotifyFutureResourceUsageChangesUseCase<R, N>
29where
30    R: ResourceUsageRepository,
31    N: Notifier,
32{
33    /// 新しいインスタンスを作成し、初期状態を取得する
34    ///
35    /// # Arguments
36    /// * `repository` - リソース使用リポジトリ(Arc で共有)
37    /// * `notifier` - 通知サービス
38    ///
39    /// # Errors
40    /// リポジトリから初期状態の取得に失敗した場合
41    pub async fn new(repository: Arc<R>, notifier: N) -> Result<Self, ApplicationError> {
42        let instance = Self {
43            repository,
44            notifier,
45            previous_state: tokio::sync::Mutex::new(HashMap::new()),
46        };
47
48        let current_usages = instance.fetch_current_usages().await?;
49        *instance.previous_state.lock().await = current_usages;
50
51        Ok(instance)
52    }
53
54    /// 一度だけポーリングを実行し、変更を検知して通知する
55    ///
56    /// 前回の状態と現在の状態を比較し、作成・更新・削除された予約を検知して通知します。
57    ///
58    /// # Errors
59    /// リポジトリアクセスまたは通知送信に失敗した場合
60    pub async fn poll_once(&self) -> Result<(), ApplicationError> {
61        let current_usages = self.fetch_current_usages().await?;
62        let mut previous_usages = self.previous_state.lock().await;
63
64        self.detect_and_notify_created_usages(&previous_usages, &current_usages)
65            .await?;
66        self.detect_and_notify_updated_usages(&previous_usages, &current_usages)
67            .await?;
68        self.detect_and_notify_deleted_usages(&previous_usages, &current_usages)
69            .await?;
70
71        *previous_usages = current_usages;
72
73        Ok(())
74    }
75
76    async fn fetch_current_usages(
77        &self,
78    ) -> Result<HashMap<String, ResourceUsage>, ApplicationError> {
79        let usages = self.repository.find_future().await?;
80        Ok(usages
81            .into_iter()
82            .map(|usage| (usage.id().as_str().to_string(), usage))
83            .collect())
84    }
85
86    async fn detect_and_notify_created_usages(
87        &self,
88        previous: &HashMap<String, ResourceUsage>,
89        current: &HashMap<String, ResourceUsage>,
90    ) -> Result<(), ApplicationError> {
91        for (id, usage) in current {
92            if !previous.contains_key(id) {
93                self.notify_created(usage.clone()).await?;
94            }
95        }
96        Ok(())
97    }
98
99    async fn detect_and_notify_updated_usages(
100        &self,
101        previous: &HashMap<String, ResourceUsage>,
102        current: &HashMap<String, ResourceUsage>,
103    ) -> Result<(), ApplicationError> {
104        for (id, current_usage) in current {
105            if let Some(previous_usage) = previous.get(id)
106                && previous_usage != current_usage
107            {
108                self.notify_updated(current_usage.clone()).await?;
109            }
110        }
111        Ok(())
112    }
113
114    async fn detect_and_notify_deleted_usages(
115        &self,
116        previous: &HashMap<String, ResourceUsage>,
117        current: &HashMap<String, ResourceUsage>,
118    ) -> Result<(), ApplicationError> {
119        let now = chrono::Utc::now();
120
121        // previousを現在時刻基準で「まだ未来」のものだけに絞る
122        // (currentと同じ時間軸に合わせることで、自然な期限切れを削除と誤検知しない)
123        let previous_still_future: HashMap<_, _> = previous
124            .iter()
125            .filter(|(_, usage)| usage.time_period().end() > now)
126            .collect();
127
128        // フィルタリング後のpreviousとcurrentを比較
129        for (id, usage) in previous_still_future {
130            if !current.contains_key(id) {
131                self.notify_deleted(usage.clone()).await?;
132            }
133        }
134        Ok(())
135    }
136
137    async fn notify_created(&self, usage: ResourceUsage) -> Result<(), ApplicationError> {
138        let event = NotificationEvent::ResourceUsageCreated(usage);
139        self.notifier.notify(event).await?;
140        Ok(())
141    }
142
143    async fn notify_updated(&self, usage: ResourceUsage) -> Result<(), ApplicationError> {
144        let event = NotificationEvent::ResourceUsageUpdated(usage);
145        self.notifier.notify(event).await?;
146        Ok(())
147    }
148
149    async fn notify_deleted(&self, usage: ResourceUsage) -> Result<(), ApplicationError> {
150        let event = NotificationEvent::ResourceUsageDeleted(usage);
151        self.notifier.notify(event).await?;
152        Ok(())
153    }
154}