funpay_client/client/
poller.rs

1use crate::client::FunpayGateway;
2use crate::error::FunPayError;
3use crate::events::Event;
4use crate::models::ids::{ChatId, OrderId};
5use crate::models::{ChatShortcut, Message, OrderShortcut};
6use crate::parsing::{parse_message_html, parse_orders_list};
7use crate::storage::StateStorage;
8use log::debug;
9use scraper::{Html, Selector};
10use serde_json::{json, to_string, Value};
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::broadcast::Sender;
15use tokio::time::sleep;
16
17pub struct FunPayPoller {
18    pub gateway: Arc<dyn FunpayGateway>,
19    pub golden_key: String,
20    pub user_agent: String,
21    pub id: i64,
22    pub username: Option<String>,
23    pub csrf_token: String,
24    pub phpsessid: Option<String>,
25    pub events_tx: Sender<Event>,
26    pub storage: Arc<dyn StateStorage>,
27    pub polling_interval: Duration,
28    pub error_retry_delay: Duration,
29
30    // State
31    pub last_msg_event_tag: String,
32    pub last_order_event_tag: String,
33    pub last_messages: HashMap<i64, (i64, i64, Option<String>)>,
34    pub last_messages_ids: HashMap<i64, i64>,
35    pub saved_orders: HashMap<OrderId, OrderShortcut>,
36}
37
38impl FunPayPoller {
39    pub async fn start(mut self) -> Result<(), FunPayError> {
40        self.load_last_messages_ids().await;
41        debug!(
42            target: "funpay_client",
43            "Starting polling loop for {}",
44            self.username.clone().unwrap_or_default()
45        );
46
47        let mut first = true;
48        loop {
49            let orders = json!({
50                "type": "orders_counters",
51                "id": self.id,
52                "tag": self.last_order_event_tag,
53                "data": false
54            });
55            let chats = json!({
56                "type": "chat_bookmarks",
57                "id": self.id,
58                "tag": self.last_msg_event_tag,
59                "data": false
60            });
61            let objects_json = to_string(&json!([orders, chats])).unwrap();
62
63            let updates = match self.post_runner(objects_json).await {
64                Ok(updates) => updates,
65                Err(e) => {
66                    log::error!(target: "funpay_client", "HTTP request failed: {e}. Retrying in {:?}...", self.error_retry_delay);
67                    sleep(self.error_retry_delay).await;
68                    continue;
69                }
70            };
71
72            let (evs, changed_chats) = self.parse_events_from_updates(&updates, first);
73            for ev in evs {
74                let _ = self.events_tx.send(ev);
75            }
76
77            let mut persist_required = false;
78            if !changed_chats.is_empty() {
79                match self.fetch_chats_histories(&changed_chats).await {
80                    Ok(mut histories) => {
81                        for (cid, mut msgs) in histories.drain() {
82                            if let Some(last_id) = self.last_messages_ids.get(&cid).copied() {
83                                msgs.retain(|m| m.id > last_id);
84                            }
85                            if let Some(max_id) = msgs.iter().map(|m| m.id).max() {
86                                let prev = self.last_messages_ids.insert(cid, max_id);
87                                if prev != Some(max_id) {
88                                    persist_required = true;
89                                }
90                            }
91                            if !first {
92                                for m in msgs {
93                                    let _ = self.events_tx.send(Event::NewMessage { message: m });
94                                }
95                            }
96                        }
97                    }
98                    Err(e) => {
99                        log::error!(target: "funpay_client", "Failed to fetch chat histories: {e}");
100                    }
101                }
102            }
103
104            if persist_required {
105                self.persist_last_messages_ids().await;
106            }
107
108            match self.fetch_sales_list().await {
109                Ok(list) => {
110                    let mut new_map: HashMap<OrderId, OrderShortcut> = HashMap::new();
111                    for o in list.into_iter() {
112                        new_map.insert(o.id.clone(), o);
113                    }
114                    if self.saved_orders.is_empty() {
115                        for order in new_map.values() {
116                            let _ = self.events_tx.send(Event::InitialOrder {
117                                order: order.clone(),
118                            });
119                        }
120                    } else {
121                        for (id, order) in new_map.iter() {
122                            if let Some(prev) = self.saved_orders.get(id) {
123                                if prev.status != order.status {
124                                    let _ = self.events_tx.send(Event::OrderStatusChanged {
125                                        order: order.clone(),
126                                    });
127                                }
128                            } else {
129                                let _ = self.events_tx.send(Event::NewOrder {
130                                    order: order.clone(),
131                                });
132                                if order.status == crate::models::enums::OrderStatus::Closed {
133                                    let _ = self.events_tx.send(Event::OrderStatusChanged {
134                                        order: order.clone(),
135                                    });
136                                }
137                            }
138                        }
139                    }
140                    self.saved_orders = new_map;
141                }
142                Err(e) => {
143                    log::error!(target: "funpay_client", "Failed to fetch sales list: {e}");
144                }
145            }
146
147            first = false;
148            sleep(self.polling_interval).await;
149        }
150    }
151
152    async fn post_runner(&self, objects_json: String) -> Result<Value, FunPayError> {
153        self.gateway
154            .post_runner(
155                &self.golden_key,
156                &self.user_agent,
157                &self.csrf_token,
158                self.phpsessid.as_deref(),
159                &objects_json,
160                None,
161            )
162            .await
163    }
164
165    fn parse_events_from_updates(
166        &mut self,
167        updates: &Value,
168        first: bool,
169    ) -> (Vec<Event>, Vec<(i64, Option<String>)>) {
170        let mut events = Vec::new();
171        let mut changed_chats: Vec<ChatShortcut> = Vec::new();
172        let objects = updates
173            .get("objects")
174            .and_then(|x| x.as_array())
175            .cloned()
176            .unwrap_or_default();
177        for obj in objects {
178            let typ = obj.get("type").and_then(|x| x.as_str()).unwrap_or("");
179            if typ == "chat_bookmarks" {
180                if let Some(tag) = obj.get("tag").and_then(|x| x.as_str()) {
181                    self.last_msg_event_tag = tag.to_string();
182                }
183                let html = obj
184                    .get("data")
185                    .and_then(|x| x.get("html"))
186                    .and_then(|x| x.as_str())
187                    .unwrap_or("");
188                if html.is_empty() {
189                    continue;
190                }
191                let chats = self.parse_chat_bookmarks(html);
192                if first {
193                    for ch in chats {
194                        events.push(Event::InitialChat { chat: ch.clone() });
195                        if ch.node_msg_id > 0 {
196                            changed_chats.push(ch);
197                        }
198                    }
199                } else {
200                    if !chats.is_empty() {
201                        events.push(Event::ChatsListChanged);
202                    }
203                    for ch in chats {
204                        let prev = self
205                            .last_messages
206                            .get(&ch.id)
207                            .cloned()
208                            .unwrap_or((-1, -1, None));
209                        if ch.node_msg_id > prev.0 {
210                            events.push(Event::LastChatMessageChanged { chat: ch.clone() });
211                            changed_chats.push(ch.clone());
212                        }
213                        self.last_messages.insert(
214                            ch.id,
215                            (ch.node_msg_id, ch.user_msg_id, ch.last_message_text.clone()),
216                        );
217                    }
218                }
219            } else if typ == "orders_counters" {
220                if let Some(tag) = obj.get("tag").and_then(|x| x.as_str()) {
221                    self.last_order_event_tag = tag.to_string();
222                }
223                let purchases = obj
224                    .get("data")
225                    .and_then(|x| x.get("buyer"))
226                    .and_then(|x| x.as_i64())
227                    .unwrap_or(0) as i32;
228                let sales = obj
229                    .get("data")
230                    .and_then(|x| x.get("seller"))
231                    .and_then(|x| x.as_i64())
232                    .unwrap_or(0) as i32;
233                events.push(Event::OrdersListChanged { purchases, sales });
234            }
235        }
236        let chats_data: Vec<(i64, Option<String>)> = changed_chats
237            .into_iter()
238            .map(|c| (c.id, Some(c.name)))
239            .collect();
240        (events, chats_data)
241    }
242
243    fn parse_chat_bookmarks(&mut self, html: &str) -> Vec<ChatShortcut> {
244        let doc = Html::parse_fragment(html);
245        let sel_chat = Selector::parse("a.contact-item").unwrap();
246        let sel_msg = Selector::parse("div.contact-item-message").unwrap();
247        let sel_name = Selector::parse("div.media-user-name").unwrap();
248        let mut out = Vec::new();
249        for el in doc.select(&sel_chat) {
250            let id_attr = el.value().attr("data-id").unwrap_or("0");
251            let id = id_attr.parse::<i64>().unwrap_or(0);
252            let node_msg_id = el
253                .value()
254                .attr("data-node-msg")
255                .unwrap_or("0")
256                .parse::<i64>()
257                .unwrap_or(0);
258            let user_msg_id = el
259                .value()
260                .attr("data-user-msg")
261                .unwrap_or("0")
262                .parse::<i64>()
263                .unwrap_or(0);
264            let unread = el.value().classes().any(|c| c == "unread");
265            let last_message_text = el
266                .select(&sel_msg)
267                .next()
268                .map(|n| n.text().collect::<String>());
269            let name = el
270                .select(&sel_name)
271                .next()
272                .map(|n| n.text().collect::<String>())
273                .unwrap_or_default();
274            out.push(ChatShortcut {
275                id,
276                name,
277                last_message_text,
278                node_msg_id,
279                user_msg_id,
280                unread,
281            });
282        }
283        out
284    }
285
286    async fn fetch_sales_list(&self) -> Result<Vec<OrderShortcut>, FunPayError> {
287        let body = self
288            .gateway
289            .get_orders_trade(&self.golden_key, &self.user_agent)
290            .await?;
291        parse_orders_list(&body, self.id)
292    }
293
294    async fn fetch_chats_histories(
295        &self,
296        chats_data: &[(i64, Option<String>)],
297    ) -> Result<HashMap<i64, Vec<Message>>, FunPayError> {
298        let mut objects = Vec::with_capacity(chats_data.len());
299        for (chat_id, _name) in chats_data.iter() {
300            objects.push(json!({
301                "type": "chat_node",
302                "id": chat_id,
303                "tag": "00000000",
304                "data": {"node": chat_id, "last_message": -1, "content": ""}
305            }));
306        }
307        let objects_json = to_string(&objects).unwrap();
308        let res = self.post_runner(objects_json).await?;
309        let mut out: HashMap<i64, Vec<Message>> = HashMap::new();
310        let objects = res
311            .get("objects")
312            .and_then(|x| x.as_array())
313            .cloned()
314            .unwrap_or_default();
315        for obj in objects {
316            if obj.get("type").and_then(|x| x.as_str()) != Some("chat_node") {
317                continue;
318            }
319            let id = obj.get("id").and_then(|x| x.as_i64()).unwrap_or(0);
320            let data = obj.get("data");
321            if data.is_none() {
322                out.insert(id, Vec::new());
323                continue;
324            }
325            let data = data.unwrap();
326            let messages = data
327                .get("messages")
328                .and_then(|x| x.as_array())
329                .cloned()
330                .unwrap_or_default();
331            let mut list = Vec::new();
332            for m in messages {
333                let mid = m.get("id").and_then(|x| x.as_i64()).unwrap_or(0);
334                let author_id = m.get("author").and_then(|x| x.as_i64()).unwrap_or(0);
335                let html = m.get("html").and_then(|x| x.as_str()).unwrap_or("");
336                let (text, _image) = parse_message_html(html);
337                list.push(Message {
338                    id: mid,
339                    chat_id: ChatId::from(format!("{id}")),
340                    chat_name: chats_data
341                        .iter()
342                        .find(|(cid, _)| *cid == id)
343                        .and_then(|(_, n)| n.clone()),
344                    text,
345                    interlocutor_id: None,
346                    author_id,
347                });
348            }
349            out.insert(id, list);
350        }
351        Ok(out)
352    }
353
354    async fn load_last_messages_ids(&mut self) {
355        match self.storage.load().await {
356            Ok(stored) => {
357                self.last_messages_ids = stored;
358            }
359            Err(e) => {
360                log::error!(target: "funpay_client", "Failed to load last messages store: {e}");
361            }
362        }
363    }
364
365    async fn persist_last_messages_ids(&self) {
366        if let Err(e) = self.storage.save(&self.last_messages_ids).await {
367            log::error!(target: "funpay_client", "Failed to persist last messages ids: {e}");
368        }
369    }
370}