1use std::net::SocketAddr;
2
3use chrono::Utc;
4use serde::{Deserialize, Serialize};
5
6use crate::Escalon;
7use crate::{ClientState, EscalonClient};
8
9#[derive(Clone, Debug, Deserialize, Serialize)]
10pub struct Message {
11 pub action: Action,
12}
13
14#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
15pub enum Action {
16 Join(JoinContent),
17 Check(CheckContent),
18 FoundDead(FoundDeadContent), TakeJobs(TakeJobsContent),
20 Done(DoneContent),
21}
22
23#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
24pub struct JoinContent {
25 pub address: SocketAddr,
26 pub sender_id: String,
27 pub start_time: std::time::SystemTime,
28}
29
30#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
31pub struct CheckContent {
32 pub sender_id: String,
33 pub jobs: usize,
34}
35
36#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
37pub struct FoundDeadContent {
38 pub sender_id: String,
39 pub dead_id: String,
40}
41
42#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
43pub struct TakeJobsContent {
44 pub sender_id: String,
45 pub take_from: String,
46 pub start_at: usize,
47 pub n_jobs: usize,
48}
49
50#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
51pub struct DoneContent {
52 pub sender_id: String,
53 pub take_from: String,
54 pub n_jobs: Vec<String>,
55}
56
57impl Message {
58 pub fn new_join(
59 addr: SocketAddr,
60 id: impl Into<String>,
61 start_time: std::time::SystemTime,
62 ) -> Self {
63 Self {
64 action: Action::Join(JoinContent {
65 address: addr,
66 sender_id: id.into(),
67 start_time,
68 }),
69 }
70 }
71
72 pub async fn handle_join(&self, escalon: &Escalon, content: JoinContent) {
75 if !escalon.clients.lock().unwrap().contains_key(&content.sender_id) {
76 let message = Message::new_join(escalon.address, &escalon.id, escalon.start_time);
77
78 escalon
79 .tx_sender
80 .clone()
81 .unwrap()
82 .send((message, Some(content.address)))
83 .await
84 .unwrap();
85 }
86
87 escalon.clients.lock().unwrap().insert(
88 content.sender_id,
89 EscalonClient {
90 start_time: content.start_time,
91 address: content.address,
92 last_seen: Utc::now().timestamp(),
93 state: ClientState {
94 jobs: 0,
95 },
96 },
97 );
98 }
99}
100
101impl Message {
102 pub fn new_check(id: String, jobs: usize) -> Self {
103 Self {
104 action: Action::Check(CheckContent {
105 sender_id: id,
106 jobs,
107 }),
108 }
109 }
110
111 pub fn handle_check(&self, escalon: &Escalon, content: CheckContent) {
114 escalon.clients.lock().unwrap().entry(content.sender_id).and_modify(|client| {
115 client.last_seen = Utc::now().timestamp();
116 client.state.jobs = content.jobs;
117 });
118 }
119}
120
121impl Message {
122 pub fn new_found_dead(id: String, dead_id: String) -> Self {
123 Self {
124 action: Action::FoundDead(FoundDeadContent {
125 sender_id: id,
126 dead_id,
127 }),
128 }
129 }
130
131 pub async fn handle_found_dead(&self, escalon: &Escalon, content: FoundDeadContent) {
134 if content.dead_id == escalon.id {
136 let message = Message::new_join(escalon.address, &escalon.id, escalon.start_time);
137 escalon.tx_sender.clone().unwrap().send((message, None)).await.unwrap();
138 }
139
140 let clients = match escalon.clients.lock() {
141 Ok(clients) => clients.clone(),
142 Err(_) => {
143 eprintln!("Error getting clients");
144
145 return;
146 }
147 };
148
149 if let Some(sender) = clients.get(&content.sender_id) {
150 if sender.start_time < escalon.start_time {
151 escalon.clients.lock().unwrap().remove(&content.dead_id);
152 }
153 };
154 }
155}
156
157impl Message {
158 pub fn new_take_jobs(
159 id: impl Into<String>,
160 take_from: impl Into<String>,
161 start_at: usize,
162 jobs: usize,
163 ) -> Self {
164 Self {
165 action: Action::TakeJobs(TakeJobsContent {
166 sender_id: id.into(),
167 take_from: take_from.into(),
168 start_at,
169 n_jobs: jobs,
170 }),
171 }
172 }
173
174 pub async fn handle_take_jobs(&self, escalon: &Escalon, content: TakeJobsContent) {
175 let done = escalon
176 .manager
177 .take_jobs(content.take_from.clone(), content.start_at, content.n_jobs)
178 .await
179 .unwrap();
180
181 let clients = escalon.clients.lock().unwrap().clone();
182 let sender = clients.get(&content.sender_id).unwrap();
183 let chunks = done.chunks(50);
184 for chunk in chunks {
185 let message = Message::new_done(
186 escalon.id.clone(),
187 content.take_from.clone(),
188 chunk.to_vec(),
189 );
190
191 escalon
192 .tx_sender
193 .clone()
194 .unwrap()
195 .send((message, Some(sender.address)))
196 .await
197 .unwrap();
198 }
199 }
200}
201
202impl Message {
203 pub fn new_done(
204 id: impl Into<String>,
205 take_from: impl Into<String>,
206 jobs: Vec<String>,
207 ) -> Self {
208 Self {
209 action: Action::Done(DoneContent {
210 sender_id: id.into(),
211 take_from: take_from.into(),
212 n_jobs: jobs,
213 }),
214 }
215 }
216
217 pub async fn handle_done(&self, escalon: &Escalon, content: DoneContent) {
218 escalon.manager.drop_jobs(content.n_jobs).await.unwrap();
220
221 let mut temp = escalon.distribution.lock().unwrap();
222 temp.retain(|distrib| {
224 !(*distrib.client_id == content.sender_id
225 && *distrib.take_from == content.take_from)
226 });
227 }
228}