lab_resource_manager/application/usecases/
notify_resource_usage_changes.rs1use crate::application::ApplicationError;
2use crate::domain::aggregates::resource_usage::entity::ResourceUsage;
3use crate::domain::ports::repositories::ResourceUsageRepository;
4use crate::domain::ports::{NotificationEvent, Notifier};
5use std::collections::HashMap;
6
7pub struct NotifyFutureResourceUsageChangesUseCase<R, N>
18where
19 R: ResourceUsageRepository,
20 N: Notifier,
21{
22 repository: R,
23 notifier: N,
24 previous_state: tokio::sync::Mutex<HashMap<String, ResourceUsage>>,
25}
26
27impl<R, N> NotifyFutureResourceUsageChangesUseCase<R, N>
28where
29 R: ResourceUsageRepository,
30 N: Notifier,
31{
32 pub async fn new(repository: R, notifier: N) -> Result<Self, ApplicationError> {
33 let instance = Self {
34 repository,
35 notifier,
36 previous_state: tokio::sync::Mutex::new(HashMap::new()),
37 };
38
39 let current_usages = instance.fetch_current_usages().await?;
40 *instance.previous_state.lock().await = current_usages;
41
42 Ok(instance)
43 }
44
45 pub async fn poll_once(&self) -> Result<(), ApplicationError> {
46 let current_usages = self.fetch_current_usages().await?;
47 let mut previous_usages = self.previous_state.lock().await;
48
49 self.detect_and_notify_created_usages(&previous_usages, ¤t_usages)
50 .await?;
51 self.detect_and_notify_updated_usages(&previous_usages, ¤t_usages)
52 .await?;
53 self.detect_and_notify_deleted_usages(&previous_usages, ¤t_usages)
54 .await?;
55
56 *previous_usages = current_usages;
57
58 Ok(())
59 }
60
61 async fn fetch_current_usages(
62 &self,
63 ) -> Result<HashMap<String, ResourceUsage>, ApplicationError> {
64 let usages = self.repository.find_future().await?;
65 Ok(usages
66 .into_iter()
67 .map(|usage| (usage.id().as_str().to_string(), usage))
68 .collect())
69 }
70
71 async fn detect_and_notify_created_usages(
72 &self,
73 previous: &HashMap<String, ResourceUsage>,
74 current: &HashMap<String, ResourceUsage>,
75 ) -> Result<(), ApplicationError> {
76 for (id, usage) in current {
77 if !previous.contains_key(id) {
78 self.notify_created(usage.clone()).await?;
79 }
80 }
81 Ok(())
82 }
83
84 async fn detect_and_notify_updated_usages(
85 &self,
86 previous: &HashMap<String, ResourceUsage>,
87 current: &HashMap<String, ResourceUsage>,
88 ) -> Result<(), ApplicationError> {
89 for (id, current_usage) in current {
90 if let Some(previous_usage) = previous.get(id)
91 && previous_usage != current_usage
92 {
93 self.notify_updated(current_usage.clone()).await?;
94 }
95 }
96 Ok(())
97 }
98
99 async fn detect_and_notify_deleted_usages(
100 &self,
101 previous: &HashMap<String, ResourceUsage>,
102 current: &HashMap<String, ResourceUsage>,
103 ) -> Result<(), ApplicationError> {
104 let now = chrono::Utc::now();
105
106 let previous_still_future: HashMap<_, _> = previous
109 .iter()
110 .filter(|(_, usage)| usage.time_period().end() > now)
111 .collect();
112
113 for (id, usage) in previous_still_future {
115 if !current.contains_key(id) {
116 self.notify_deleted(usage.clone()).await?;
117 }
118 }
119 Ok(())
120 }
121
122 async fn notify_created(&self, usage: ResourceUsage) -> Result<(), ApplicationError> {
123 let event = NotificationEvent::ResourceUsageCreated(usage);
124 self.notifier.notify(event).await?;
125 Ok(())
126 }
127
128 async fn notify_updated(&self, usage: ResourceUsage) -> Result<(), ApplicationError> {
129 let event = NotificationEvent::ResourceUsageUpdated(usage);
130 self.notifier.notify(event).await?;
131 Ok(())
132 }
133
134 async fn notify_deleted(&self, usage: ResourceUsage) -> Result<(), ApplicationError> {
135 let event = NotificationEvent::ResourceUsageDeleted(usage);
136 self.notifier.notify(event).await?;
137 Ok(())
138 }
139}