lab_resource_manager/application/usecases/
notify_future_resource_usage_changes.rs1use crate::application::ApplicationError;
2use crate::domain::aggregates::resource_usage::entity::ResourceUsage;
3use crate::domain::ports::repositories::ResourceUsageRepository;
4use crate::domain::ports::{NotificationEvent, Notifier};
5use std::collections::HashMap;
6use std::sync::Arc;
7
8pub struct NotifyFutureResourceUsageChangesUseCase<R, N>
19where
20 R: ResourceUsageRepository,
21 N: Notifier,
22{
23 repository: Arc<R>,
24 notifier: N,
25 previous_state: tokio::sync::Mutex<HashMap<String, ResourceUsage>>,
26}
27
28impl<R, N> NotifyFutureResourceUsageChangesUseCase<R, N>
29where
30 R: ResourceUsageRepository,
31 N: Notifier,
32{
33 pub async fn new(repository: Arc<R>, notifier: N) -> Result<Self, ApplicationError> {
42 let instance = Self {
43 repository,
44 notifier,
45 previous_state: tokio::sync::Mutex::new(HashMap::new()),
46 };
47
48 let current_usages = instance.fetch_current_usages().await?;
49 *instance.previous_state.lock().await = current_usages;
50
51 Ok(instance)
52 }
53
54 pub async fn poll_once(&self) -> Result<(), ApplicationError> {
61 let current_usages = self.fetch_current_usages().await?;
62 let mut previous_usages = self.previous_state.lock().await;
63
64 self.detect_and_notify_created_usages(&previous_usages, ¤t_usages)
65 .await?;
66 self.detect_and_notify_updated_usages(&previous_usages, ¤t_usages)
67 .await?;
68 self.detect_and_notify_deleted_usages(&previous_usages, ¤t_usages)
69 .await?;
70
71 *previous_usages = current_usages;
72
73 Ok(())
74 }
75
76 async fn fetch_current_usages(
77 &self,
78 ) -> Result<HashMap<String, ResourceUsage>, ApplicationError> {
79 let usages = self.repository.find_future().await?;
80 Ok(usages
81 .into_iter()
82 .map(|usage| (usage.id().as_str().to_string(), usage))
83 .collect())
84 }
85
86 async fn detect_and_notify_created_usages(
87 &self,
88 previous: &HashMap<String, ResourceUsage>,
89 current: &HashMap<String, ResourceUsage>,
90 ) -> Result<(), ApplicationError> {
91 for (id, usage) in current {
92 if !previous.contains_key(id) {
93 self.notify_created(usage.clone()).await?;
94 }
95 }
96 Ok(())
97 }
98
99 async fn detect_and_notify_updated_usages(
100 &self,
101 previous: &HashMap<String, ResourceUsage>,
102 current: &HashMap<String, ResourceUsage>,
103 ) -> Result<(), ApplicationError> {
104 for (id, current_usage) in current {
105 if let Some(previous_usage) = previous.get(id)
106 && previous_usage != current_usage
107 {
108 self.notify_updated(current_usage.clone()).await?;
109 }
110 }
111 Ok(())
112 }
113
114 async fn detect_and_notify_deleted_usages(
115 &self,
116 previous: &HashMap<String, ResourceUsage>,
117 current: &HashMap<String, ResourceUsage>,
118 ) -> Result<(), ApplicationError> {
119 let now = chrono::Utc::now();
120
121 let previous_still_future: HashMap<_, _> = previous
124 .iter()
125 .filter(|(_, usage)| usage.time_period().end() > now)
126 .collect();
127
128 for (id, usage) in previous_still_future {
130 if !current.contains_key(id) {
131 self.notify_deleted(usage.clone()).await?;
132 }
133 }
134 Ok(())
135 }
136
137 async fn notify_created(&self, usage: ResourceUsage) -> Result<(), ApplicationError> {
138 let event = NotificationEvent::ResourceUsageCreated(usage);
139 self.notifier.notify(event).await?;
140 Ok(())
141 }
142
143 async fn notify_updated(&self, usage: ResourceUsage) -> Result<(), ApplicationError> {
144 let event = NotificationEvent::ResourceUsageUpdated(usage);
145 self.notifier.notify(event).await?;
146 Ok(())
147 }
148
149 async fn notify_deleted(&self, usage: ResourceUsage) -> Result<(), ApplicationError> {
150 let event = NotificationEvent::ResourceUsageDeleted(usage);
151 self.notifier.notify(event).await?;
152 Ok(())
153 }
154}