escalon/types/
message.rs

1use std::net::SocketAddr;
2
3use chrono::Utc;
4use serde::{Deserialize, Serialize};
5
6use crate::Escalon;
7use crate::{ClientState, EscalonClient};
8
9#[derive(Clone, Debug, Deserialize, Serialize)]
10pub struct Message {
11    pub action: Action,
12}
13
14#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
15pub enum Action {
16    Join(JoinContent),
17    Check(CheckContent),
18    FoundDead(FoundDeadContent), // for now just the id
19    TakeJobs(TakeJobsContent),
20    Done(DoneContent),
21}
22
23#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
24pub struct JoinContent {
25    pub address: SocketAddr,
26    pub sender_id: String,
27    pub start_time: std::time::SystemTime,
28}
29
30#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
31pub struct CheckContent {
32    pub sender_id: String,
33    pub jobs: usize,
34}
35
36#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
37pub struct FoundDeadContent {
38    pub sender_id: String,
39    pub dead_id: String,
40}
41
42#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
43pub struct TakeJobsContent {
44    pub sender_id: String,
45    pub take_from: String,
46    pub start_at: usize,
47    pub n_jobs: usize,
48}
49
50#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)]
51pub struct DoneContent {
52    pub sender_id: String,
53    pub take_from: String,
54    pub n_jobs: Vec<String>,
55}
56
57impl Message {
58    pub fn new_join(
59        addr: SocketAddr,
60        id: impl Into<String>,
61        start_time: std::time::SystemTime,
62    ) -> Self {
63        Self {
64            action: Action::Join(JoinContent {
65                address: addr,
66                sender_id: id.into(),
67                start_time,
68            }),
69        }
70    }
71
72    // TODO
73    // quizá siempre enviar el join pero comprobar antes de insertar
74    pub async fn handle_join(&self, escalon: &Escalon, content: JoinContent) {
75        if !escalon.clients.lock().unwrap().contains_key(&content.sender_id) {
76            let message = Message::new_join(escalon.address, &escalon.id, escalon.start_time);
77
78            escalon
79                .tx_sender
80                .clone()
81                .unwrap()
82                .send((message, Some(content.address)))
83                .await
84                .unwrap();
85        }
86
87        escalon.clients.lock().unwrap().insert(
88            content.sender_id,
89            EscalonClient {
90                start_time: content.start_time,
91                address: content.address,
92                last_seen: Utc::now().timestamp(),
93                state: ClientState {
94                    jobs: 0,
95                },
96            },
97        );
98    }
99}
100
101impl Message {
102    pub fn new_check(id: String, jobs: usize) -> Self {
103        Self {
104            action: Action::Check(CheckContent {
105                sender_id: id,
106                jobs,
107            }),
108        }
109    }
110
111    // TODO
112    // quizá enviar un join si no está en la lista de clientes
113    pub fn handle_check(&self, escalon: &Escalon, content: CheckContent) {
114        escalon.clients.lock().unwrap().entry(content.sender_id).and_modify(|client| {
115            client.last_seen = Utc::now().timestamp();
116            client.state.jobs = content.jobs;
117        });
118    }
119}
120
121impl Message {
122    pub fn new_found_dead(id: String, dead_id: String) -> Self {
123        Self {
124            action: Action::FoundDead(FoundDeadContent {
125                sender_id: id,
126                dead_id,
127            }),
128        }
129    }
130
131    // TODO
132    // quizá spawn un thread para enviar el join y quitar async de la firma
133    pub async fn handle_found_dead(&self, escalon: &Escalon, content: FoundDeadContent) {
134        // si soy yo enviar un join 
135        if content.dead_id == escalon.id {
136            let message = Message::new_join(escalon.address, &escalon.id, escalon.start_time);
137            escalon.tx_sender.clone().unwrap().send((message, None)).await.unwrap();
138        }
139
140        let clients = match escalon.clients.lock() {
141            Ok(clients) => clients.clone(),
142            Err(_) => {
143                eprintln!("Error getting clients");
144
145                return;
146            }
147        };
148
149        if let Some(sender) = clients.get(&content.sender_id) {
150            if sender.start_time < escalon.start_time {
151                escalon.clients.lock().unwrap().remove(&content.dead_id);
152            }
153        };
154    }
155}
156
157impl Message {
158    pub fn new_take_jobs(
159        id: impl Into<String>,
160        take_from: impl Into<String>,
161        start_at: usize,
162        jobs: usize,
163    ) -> Self {
164        Self {
165            action: Action::TakeJobs(TakeJobsContent {
166                sender_id: id.into(),
167                take_from: take_from.into(),
168                start_at,
169                n_jobs: jobs,
170            }),
171        }
172    }
173
174    pub async fn handle_take_jobs(&self, escalon: &Escalon, content: TakeJobsContent) {
175        let done = escalon
176            .manager
177            .take_jobs(content.take_from.clone(), content.start_at, content.n_jobs)
178            .await
179            .unwrap();
180
181        let clients = escalon.clients.lock().unwrap().clone();
182        let sender = clients.get(&content.sender_id).unwrap();
183        let chunks = done.chunks(50);
184        for chunk in chunks {
185            let message = Message::new_done(
186                escalon.id.clone(),
187                content.take_from.clone(),
188                chunk.to_vec(),
189            );
190
191            escalon
192                .tx_sender
193                .clone()
194                .unwrap()
195                .send((message, Some(sender.address)))
196                .await
197                .unwrap();
198        }
199    }
200}
201
202impl Message {
203    pub fn new_done(
204        id: impl Into<String>,
205        take_from: impl Into<String>,
206        jobs: Vec<String>,
207    ) -> Self {
208        Self {
209            action: Action::Done(DoneContent {
210                sender_id: id.into(),
211                take_from: take_from.into(),
212                n_jobs: jobs,
213            }),
214        }
215    }
216
217    pub async fn handle_done(&self, escalon: &Escalon, content: DoneContent) {
218        // Should manage own jobs and case dead client jobs
219        escalon.manager.drop_jobs(content.n_jobs).await.unwrap();
220
221        let mut temp = escalon.distribution.lock().unwrap();
222        // temp.retain(|(sender, form_client, _start_at, _n_jobs, _done)| {
223        temp.retain(|distrib| {
224            !(*distrib.client_id == content.sender_id
225                && *distrib.take_from == content.take_from)
226        });
227    }
228}