1use crate::client::FunpayGateway;
2use crate::error::FunPayError;
3use crate::events::Event;
4use crate::models::ids::{ChatId, OrderId};
5use crate::models::{ChatShortcut, Message, OrderShortcut};
6use crate::parsing::{parse_message_html, parse_orders_list};
7use crate::storage::StateStorage;
8use log::debug;
9use scraper::{Html, Selector};
10use serde_json::{json, to_string, Value};
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::Duration;
14use tokio::sync::broadcast::Sender;
15use tokio::time::sleep;
16
17pub struct FunPayPoller {
18 pub gateway: Arc<dyn FunpayGateway>,
19 pub golden_key: String,
20 pub user_agent: String,
21 pub id: i64,
22 pub username: Option<String>,
23 pub csrf_token: String,
24 pub phpsessid: Option<String>,
25 pub events_tx: Sender<Event>,
26 pub storage: Arc<dyn StateStorage>,
27 pub polling_interval: Duration,
28 pub error_retry_delay: Duration,
29
30 pub last_msg_event_tag: String,
32 pub last_order_event_tag: String,
33 pub last_messages: HashMap<i64, (i64, i64, Option<String>)>,
34 pub last_messages_ids: HashMap<i64, i64>,
35 pub saved_orders: HashMap<OrderId, OrderShortcut>,
36}
37
38impl FunPayPoller {
39 pub async fn start(mut self) -> Result<(), FunPayError> {
40 self.load_last_messages_ids().await;
41 debug!(
42 target: "funpay_client",
43 "Starting polling loop for {}",
44 self.username.clone().unwrap_or_default()
45 );
46
47 let mut first = true;
48 loop {
49 let orders = json!({
50 "type": "orders_counters",
51 "id": self.id,
52 "tag": self.last_order_event_tag,
53 "data": false
54 });
55 let chats = json!({
56 "type": "chat_bookmarks",
57 "id": self.id,
58 "tag": self.last_msg_event_tag,
59 "data": false
60 });
61 let objects_json = to_string(&json!([orders, chats])).unwrap();
62
63 let updates = match self.post_runner(objects_json).await {
64 Ok(updates) => updates,
65 Err(e) => {
66 log::error!(target: "funpay_client", "HTTP request failed: {e}. Retrying in {:?}...", self.error_retry_delay);
67 sleep(self.error_retry_delay).await;
68 continue;
69 }
70 };
71
72 let (evs, changed_chats) = self.parse_events_from_updates(&updates, first);
73 for ev in evs {
74 let _ = self.events_tx.send(ev);
75 }
76
77 let mut persist_required = false;
78 if !changed_chats.is_empty() {
79 match self.fetch_chats_histories(&changed_chats).await {
80 Ok(mut histories) => {
81 for (cid, mut msgs) in histories.drain() {
82 if let Some(last_id) = self.last_messages_ids.get(&cid).copied() {
83 msgs.retain(|m| m.id > last_id);
84 }
85 if let Some(max_id) = msgs.iter().map(|m| m.id).max() {
86 let prev = self.last_messages_ids.insert(cid, max_id);
87 if prev != Some(max_id) {
88 persist_required = true;
89 }
90 }
91 if !first {
92 for m in msgs {
93 let _ = self.events_tx.send(Event::NewMessage { message: m });
94 }
95 }
96 }
97 }
98 Err(e) => {
99 log::error!(target: "funpay_client", "Failed to fetch chat histories: {e}");
100 }
101 }
102 }
103
104 if persist_required {
105 self.persist_last_messages_ids().await;
106 }
107
108 match self.fetch_sales_list().await {
109 Ok(list) => {
110 let mut new_map: HashMap<OrderId, OrderShortcut> = HashMap::new();
111 for o in list.into_iter() {
112 new_map.insert(o.id.clone(), o);
113 }
114 if self.saved_orders.is_empty() {
115 for order in new_map.values() {
116 let _ = self.events_tx.send(Event::InitialOrder {
117 order: order.clone(),
118 });
119 }
120 } else {
121 for (id, order) in new_map.iter() {
122 if let Some(prev) = self.saved_orders.get(id) {
123 if prev.status != order.status {
124 let _ = self.events_tx.send(Event::OrderStatusChanged {
125 order: order.clone(),
126 });
127 }
128 } else {
129 let _ = self.events_tx.send(Event::NewOrder {
130 order: order.clone(),
131 });
132 if order.status == crate::models::enums::OrderStatus::Closed {
133 let _ = self.events_tx.send(Event::OrderStatusChanged {
134 order: order.clone(),
135 });
136 }
137 }
138 }
139 }
140 self.saved_orders = new_map;
141 }
142 Err(e) => {
143 log::error!(target: "funpay_client", "Failed to fetch sales list: {e}");
144 }
145 }
146
147 first = false;
148 sleep(self.polling_interval).await;
149 }
150 }
151
152 async fn post_runner(&self, objects_json: String) -> Result<Value, FunPayError> {
153 self.gateway
154 .post_runner(
155 &self.golden_key,
156 &self.user_agent,
157 &self.csrf_token,
158 self.phpsessid.as_deref(),
159 &objects_json,
160 None,
161 )
162 .await
163 }
164
165 fn parse_events_from_updates(
166 &mut self,
167 updates: &Value,
168 first: bool,
169 ) -> (Vec<Event>, Vec<(i64, Option<String>)>) {
170 let mut events = Vec::new();
171 let mut changed_chats: Vec<ChatShortcut> = Vec::new();
172 let objects = updates
173 .get("objects")
174 .and_then(|x| x.as_array())
175 .cloned()
176 .unwrap_or_default();
177 for obj in objects {
178 let typ = obj.get("type").and_then(|x| x.as_str()).unwrap_or("");
179 if typ == "chat_bookmarks" {
180 if let Some(tag) = obj.get("tag").and_then(|x| x.as_str()) {
181 self.last_msg_event_tag = tag.to_string();
182 }
183 let html = obj
184 .get("data")
185 .and_then(|x| x.get("html"))
186 .and_then(|x| x.as_str())
187 .unwrap_or("");
188 if html.is_empty() {
189 continue;
190 }
191 let chats = self.parse_chat_bookmarks(html);
192 if first {
193 for ch in chats {
194 events.push(Event::InitialChat { chat: ch.clone() });
195 if ch.node_msg_id > 0 {
196 changed_chats.push(ch);
197 }
198 }
199 } else {
200 if !chats.is_empty() {
201 events.push(Event::ChatsListChanged);
202 }
203 for ch in chats {
204 let prev = self
205 .last_messages
206 .get(&ch.id)
207 .cloned()
208 .unwrap_or((-1, -1, None));
209 if ch.node_msg_id > prev.0 {
210 events.push(Event::LastChatMessageChanged { chat: ch.clone() });
211 changed_chats.push(ch.clone());
212 }
213 self.last_messages.insert(
214 ch.id,
215 (ch.node_msg_id, ch.user_msg_id, ch.last_message_text.clone()),
216 );
217 }
218 }
219 } else if typ == "orders_counters" {
220 if let Some(tag) = obj.get("tag").and_then(|x| x.as_str()) {
221 self.last_order_event_tag = tag.to_string();
222 }
223 let purchases = obj
224 .get("data")
225 .and_then(|x| x.get("buyer"))
226 .and_then(|x| x.as_i64())
227 .unwrap_or(0) as i32;
228 let sales = obj
229 .get("data")
230 .and_then(|x| x.get("seller"))
231 .and_then(|x| x.as_i64())
232 .unwrap_or(0) as i32;
233 events.push(Event::OrdersListChanged { purchases, sales });
234 }
235 }
236 let chats_data: Vec<(i64, Option<String>)> = changed_chats
237 .into_iter()
238 .map(|c| (c.id, Some(c.name)))
239 .collect();
240 (events, chats_data)
241 }
242
243 fn parse_chat_bookmarks(&mut self, html: &str) -> Vec<ChatShortcut> {
244 let doc = Html::parse_fragment(html);
245 let sel_chat = Selector::parse("a.contact-item").unwrap();
246 let sel_msg = Selector::parse("div.contact-item-message").unwrap();
247 let sel_name = Selector::parse("div.media-user-name").unwrap();
248 let mut out = Vec::new();
249 for el in doc.select(&sel_chat) {
250 let id_attr = el.value().attr("data-id").unwrap_or("0");
251 let id = id_attr.parse::<i64>().unwrap_or(0);
252 let node_msg_id = el
253 .value()
254 .attr("data-node-msg")
255 .unwrap_or("0")
256 .parse::<i64>()
257 .unwrap_or(0);
258 let user_msg_id = el
259 .value()
260 .attr("data-user-msg")
261 .unwrap_or("0")
262 .parse::<i64>()
263 .unwrap_or(0);
264 let unread = el.value().classes().any(|c| c == "unread");
265 let last_message_text = el
266 .select(&sel_msg)
267 .next()
268 .map(|n| n.text().collect::<String>());
269 let name = el
270 .select(&sel_name)
271 .next()
272 .map(|n| n.text().collect::<String>())
273 .unwrap_or_default();
274 out.push(ChatShortcut {
275 id,
276 name,
277 last_message_text,
278 node_msg_id,
279 user_msg_id,
280 unread,
281 });
282 }
283 out
284 }
285
286 async fn fetch_sales_list(&self) -> Result<Vec<OrderShortcut>, FunPayError> {
287 let body = self
288 .gateway
289 .get_orders_trade(&self.golden_key, &self.user_agent)
290 .await?;
291 parse_orders_list(&body, self.id)
292 }
293
294 async fn fetch_chats_histories(
295 &self,
296 chats_data: &[(i64, Option<String>)],
297 ) -> Result<HashMap<i64, Vec<Message>>, FunPayError> {
298 let mut objects = Vec::with_capacity(chats_data.len());
299 for (chat_id, _name) in chats_data.iter() {
300 objects.push(json!({
301 "type": "chat_node",
302 "id": chat_id,
303 "tag": "00000000",
304 "data": {"node": chat_id, "last_message": -1, "content": ""}
305 }));
306 }
307 let objects_json = to_string(&objects).unwrap();
308 let res = self.post_runner(objects_json).await?;
309 let mut out: HashMap<i64, Vec<Message>> = HashMap::new();
310 let objects = res
311 .get("objects")
312 .and_then(|x| x.as_array())
313 .cloned()
314 .unwrap_or_default();
315 for obj in objects {
316 if obj.get("type").and_then(|x| x.as_str()) != Some("chat_node") {
317 continue;
318 }
319 let id = obj.get("id").and_then(|x| x.as_i64()).unwrap_or(0);
320 let data = obj.get("data");
321 if data.is_none() {
322 out.insert(id, Vec::new());
323 continue;
324 }
325 let data = data.unwrap();
326 let messages = data
327 .get("messages")
328 .and_then(|x| x.as_array())
329 .cloned()
330 .unwrap_or_default();
331 let mut list = Vec::new();
332 for m in messages {
333 let mid = m.get("id").and_then(|x| x.as_i64()).unwrap_or(0);
334 let author_id = m.get("author").and_then(|x| x.as_i64()).unwrap_or(0);
335 let html = m.get("html").and_then(|x| x.as_str()).unwrap_or("");
336 let (text, _image) = parse_message_html(html);
337 list.push(Message {
338 id: mid,
339 chat_id: ChatId::from(format!("{id}")),
340 chat_name: chats_data
341 .iter()
342 .find(|(cid, _)| *cid == id)
343 .and_then(|(_, n)| n.clone()),
344 text,
345 interlocutor_id: None,
346 author_id,
347 });
348 }
349 out.insert(id, list);
350 }
351 Ok(out)
352 }
353
354 async fn load_last_messages_ids(&mut self) {
355 match self.storage.load().await {
356 Ok(stored) => {
357 self.last_messages_ids = stored;
358 }
359 Err(e) => {
360 log::error!(target: "funpay_client", "Failed to load last messages store: {e}");
361 }
362 }
363 }
364
365 async fn persist_last_messages_ids(&self) {
366 if let Err(e) = self.storage.save(&self.last_messages_ids).await {
367 log::error!(target: "funpay_client", "Failed to persist last messages ids: {e}");
368 }
369 }
370}