1use super::{Database, now_ms};
4use crate::types::{ClaimEvent, ClaimEventType, ClaimUpdates, FileLock};
5use anyhow::Result;
6use rusqlite::params;
7use std::collections::{HashMap, HashSet};
8
9impl Database {
10 pub fn lock_file(
13 &self,
14 file_path: String,
15 worker_id: &str,
16 reason: Option<String>,
17 task_id: Option<String>,
18 ) -> Result<Option<String>> {
19 let now = now_ms();
20
21 self.with_conn_mut(|conn| {
22 let tx = conn.transaction()?;
23 let existing: Option<String> = tx
25 .query_row(
26 "SELECT worker_id FROM file_locks WHERE file_path = ?1",
27 params![&file_path],
28 |row| row.get(0),
29 )
30 .ok();
31
32 let result = if let Some(existing_worker) = existing {
33 if existing_worker != worker_id {
34 Some(existing_worker)
36 } else {
37 tx.execute(
39 "UPDATE file_locks SET locked_at = ?1, reason = ?2, task_id = ?3 WHERE file_path = ?4",
40 params![now, &reason, &task_id, &file_path],
41 )?;
42 None
43 }
44 } else {
45 tx.execute(
47 "INSERT INTO file_locks (file_path, worker_id, reason, locked_at, task_id) VALUES (?1, ?2, ?3, ?4, ?5)",
48 params![&file_path, worker_id, &reason, now, &task_id],
49 )?;
50
51 tx.execute(
53 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp) VALUES (?1, ?2, 'claimed', ?3, ?4)",
54 params![&file_path, worker_id, &reason, now],
55 )?;
56 None
57 };
58
59 tx.commit()?;
60 Ok(result)
61 })
62 }
63
64 pub fn unlock_file(
66 &self,
67 file_path: &str,
68 worker_id: &str,
69 reason: Option<String>,
70 ) -> Result<bool> {
71 let now = now_ms();
72
73 self.with_conn_mut(|conn| {
74 let tx = conn.transaction()?;
75
76 let deleted = tx.execute(
77 "DELETE FROM file_locks WHERE file_path = ?1 AND worker_id = ?2",
78 params![file_path, worker_id],
79 )?;
80
81 if deleted > 0 {
82 let claim_id: Option<i64> = tx.query_row(
84 "SELECT MAX(id) FROM claim_sequence
85 WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
86 params![file_path, worker_id],
87 |row| row.get(0),
88 ).ok().flatten();
89
90 tx.execute(
92 "UPDATE claim_sequence SET end_timestamp = ?1
93 WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
94 params![now, file_path, worker_id],
95 )?;
96
97 tx.execute(
99 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
100 VALUES (?1, ?2, 'released', ?3, ?4, ?5)",
101 params![file_path, worker_id, &reason, now, claim_id],
102 )?;
103 }
104
105 tx.commit()?;
106 Ok(deleted > 0)
107 })
108 }
109
110 pub fn unlock_files_verbose(
113 &self,
114 file_paths: Vec<String>,
115 worker_id: &str,
116 reason: Option<String>,
117 ) -> Result<Vec<(String, String)>> {
118 let now = now_ms();
119 let mut released = Vec::new();
120
121 self.with_conn_mut(|conn| {
122 let tx = conn.transaction()?;
123
124 for file_path in file_paths {
125 let deleted = tx.execute(
126 "DELETE FROM file_locks WHERE file_path = ?1 AND worker_id = ?2",
127 params![&file_path, worker_id],
128 )?;
129
130 if deleted > 0 {
131 let claim_id: Option<i64> = tx.query_row(
133 "SELECT MAX(id) FROM claim_sequence
134 WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
135 params![&file_path, worker_id],
136 |row| row.get(0),
137 ).ok().flatten();
138
139 tx.execute(
141 "UPDATE claim_sequence SET end_timestamp = ?1
142 WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
143 params![now, &file_path, worker_id],
144 )?;
145
146 tx.execute(
148 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
149 VALUES (?1, ?2, 'released', ?3, ?4, ?5)",
150 params![&file_path, worker_id, &reason, now, claim_id],
151 )?;
152
153 released.push((file_path, worker_id.to_string()));
154 }
155 }
156
157 tx.commit()?;
158 Ok(released)
159 })
160 }
161
162 pub fn release_worker_locks_verbose(
165 &self,
166 worker_id: &str,
167 reason: Option<String>,
168 ) -> Result<Vec<(String, String)>> {
169 let now = now_ms();
170
171 self.with_conn_mut(|conn| {
172 let tx = conn.transaction()?;
173
174 let files_to_release: Vec<String> = {
176 let mut stmt =
177 tx.prepare("SELECT file_path FROM file_locks WHERE worker_id = ?1")?;
178 stmt.query_map(params![worker_id], |row| row.get::<_, String>(0))?
179 .filter_map(|r| r.ok())
180 .collect()
181 };
182
183 if files_to_release.is_empty() {
184 tx.commit()?;
185 return Ok(Vec::new());
186 }
187
188 tx.execute(
190 "UPDATE claim_sequence SET end_timestamp = ?1
191 WHERE worker_id = ?2 AND end_timestamp IS NULL",
192 params![now, worker_id],
193 )?;
194
195 for file_path in &files_to_release {
197 tx.execute(
198 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
199 VALUES (?1, ?2, 'released', ?3, ?4)",
200 params![file_path, worker_id, &reason, now],
201 )?;
202 }
203
204 tx.execute(
206 "DELETE FROM file_locks WHERE worker_id = ?1",
207 params![worker_id],
208 )?;
209
210 tx.commit()?;
211
212 let released: Vec<(String, String)> = files_to_release
213 .into_iter()
214 .map(|f| (f, worker_id.to_string()))
215 .collect();
216
217 Ok(released)
218 })
219 }
220
221 pub fn release_task_locks_verbose(
224 &self,
225 task_id: &str,
226 reason: Option<String>,
227 ) -> Result<Vec<(String, String)>> {
228 let now = now_ms();
229
230 self.with_conn_mut(|conn| {
231 let tx = conn.transaction()?;
232
233 let files_to_release: Vec<(String, String)> = {
235 let mut stmt =
236 tx.prepare("SELECT file_path, worker_id FROM file_locks WHERE task_id = ?1")?;
237 stmt.query_map(params![task_id], |row| {
238 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
239 })?
240 .filter_map(|r| r.ok())
241 .collect()
242 };
243
244 if files_to_release.is_empty() {
245 tx.commit()?;
246 return Ok(Vec::new());
247 }
248
249 for (file_path, worker_id) in &files_to_release {
251 tx.execute(
252 "UPDATE claim_sequence SET end_timestamp = ?1
253 WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
254 params![now, file_path, worker_id],
255 )?;
256
257 let reason_str = reason.as_deref().unwrap_or("task release");
259 tx.execute(
260 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
261 VALUES (?1, ?2, 'released', ?3, ?4)",
262 params![file_path, worker_id, reason_str, now],
263 )?;
264 }
265
266 tx.execute(
268 "DELETE FROM file_locks WHERE task_id = ?1",
269 params![task_id],
270 )?;
271
272 tx.commit()?;
273 Ok(files_to_release)
274 })
275 }
276
277 pub fn claim_updates(&self, worker_id: &str) -> Result<ClaimUpdates> {
280 self.with_conn(|conn| {
281 let last_seq: i64 = conn
283 .query_row(
284 "SELECT last_claim_sequence FROM workers WHERE id = ?1",
285 params![worker_id],
286 |row| row.get(0),
287 )
288 .unwrap_or(0);
289
290 let mut stmt = conn.prepare(
293 "SELECT id, file_path, worker_id, event, reason, timestamp, end_timestamp, claim_id
294 FROM claim_sequence
295 WHERE id >= ?1
296 ORDER BY id",
297 )?;
298 let events: Vec<ClaimEvent> = stmt
299 .query_map(params![last_seq], |row| {
300 Ok(ClaimEvent {
301 id: row.get(0)?,
302 file_path: row.get(1)?,
303 worker_id: row.get(2)?,
304 event: ClaimEventType::parse(&row.get::<_, String>(3)?)
305 .unwrap_or(ClaimEventType::Claimed),
306 reason: row.get(4)?,
307 timestamp: row.get(5)?,
308 end_timestamp: row.get(6)?,
309 claim_id: row.get(7)?,
310 })
311 })?
312 .filter_map(|r| r.ok())
313 .collect();
314
315 let max_seen = events.iter().map(|e| e.id).max();
318 let new_seq = match max_seen {
319 Some(max) => max + 1, None => last_seq, };
322
323 if new_seq > last_seq {
325 conn.execute(
326 "UPDATE workers SET last_claim_sequence = ?1 WHERE id = ?2",
327 params![new_seq, worker_id],
328 )?;
329 }
330
331 let new_claims: Vec<ClaimEvent> = events
333 .iter()
334 .filter(|e| e.event == ClaimEventType::Claimed)
335 .cloned()
336 .collect();
337
338 let new_claim_ids: HashSet<i64> = new_claims.iter().map(|c| c.id).collect();
342
343 let dropped_claims: Vec<ClaimEvent> = events
344 .iter()
345 .filter(|e| e.event == ClaimEventType::Released)
346 .filter(|release| {
347 match release.claim_id {
348 Some(cid) => cid < last_seq || new_claim_ids.contains(&cid),
349 None => true, }
351 })
352 .cloned()
353 .collect();
354
355 Ok(ClaimUpdates {
356 new_claims,
357 dropped_claims,
358 sequence: new_seq,
359 })
360 })
361 }
362
363 pub fn get_file_locks(
365 &self,
366 file_paths: Option<Vec<String>>,
367 agent_id: Option<&str>,
368 task_id: Option<&str>,
369 ) -> Result<HashMap<String, FileLock>> {
370 self.with_conn(|conn| {
371 let locks = if let Some(paths) = file_paths {
372 if paths.is_empty() {
373 return Ok(HashMap::new());
374 }
375
376 let placeholders: Vec<String> = paths.iter().map(|_| "?".to_string()).collect();
377 let sql = format!(
378 "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE file_path IN ({})",
379 placeholders.join(", ")
380 );
381
382 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
383 for path in &paths {
384 params_vec.push(Box::new(path.clone()));
385 }
386
387 let params_refs: Vec<&dyn rusqlite::ToSql> =
388 params_vec.iter().map(|b| b.as_ref()).collect();
389
390 let mut stmt = conn.prepare(&sql)?;
391 stmt.query_map(params_refs.as_slice(), |row| {
392 let file_path: String = row.get(0)?;
393 Ok((file_path.clone(), FileLock {
394 file_path,
395 worker_id: row.get(1)?,
396 reason: row.get(2)?,
397 locked_at: row.get(3)?,
398 task_id: row.get(4)?,
399 }))
400 })?
401 .filter_map(|r| r.ok())
402 .collect()
403 } else if let Some(aid) = agent_id {
404 let mut stmt = conn.prepare(
405 "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE worker_id = ?1",
406 )?;
407 stmt.query_map(params![aid], |row| {
408 let file_path: String = row.get(0)?;
409 Ok((file_path.clone(), FileLock {
410 file_path,
411 worker_id: row.get(1)?,
412 reason: row.get(2)?,
413 locked_at: row.get(3)?,
414 task_id: row.get(4)?,
415 }))
416 })?
417 .filter_map(|r| r.ok())
418 .collect()
419 } else if let Some(tid) = task_id {
420 let mut stmt = conn.prepare(
421 "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE task_id = ?1",
422 )?;
423 stmt.query_map(params![tid], |row| {
424 let file_path: String = row.get(0)?;
425 Ok((file_path.clone(), FileLock {
426 file_path,
427 worker_id: row.get(1)?,
428 reason: row.get(2)?,
429 locked_at: row.get(3)?,
430 task_id: row.get(4)?,
431 }))
432 })?
433 .filter_map(|r| r.ok())
434 .collect()
435 } else {
436 HashMap::new()
438 };
439
440 Ok(locks)
441 })
442 }
443
444 pub fn get_all_file_locks(&self) -> Result<Vec<FileLock>> {
446 self.with_conn(|conn| {
447 let mut stmt = conn.prepare(
448 "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks",
449 )?;
450
451 let locks = stmt
452 .query_map([], |row| {
453 let file_path: String = row.get(0)?;
454 let worker_id: String = row.get(1)?;
455 let reason: Option<String> = row.get(2)?;
456 let locked_at: i64 = row.get(3)?;
457 let task_id: Option<String> = row.get(4)?;
458 Ok(FileLock {
459 file_path,
460 worker_id,
461 reason,
462 locked_at,
463 task_id,
464 })
465 })?
466 .filter_map(|r| r.ok())
467 .collect();
468
469 Ok(locks)
470 })
471 }
472
473 pub fn release_worker_locks(&self, worker_id: &str) -> Result<i32> {
475 let now = now_ms();
476
477 self.with_conn(|conn| {
478 conn.execute(
480 "UPDATE claim_sequence SET end_timestamp = ?1
481 WHERE worker_id = ?2 AND end_timestamp IS NULL",
482 params![now, worker_id],
483 )?;
484
485 let deleted = conn.execute(
486 "DELETE FROM file_locks WHERE worker_id = ?1",
487 params![worker_id],
488 )?;
489
490 Ok(deleted as i32)
491 })
492 }
493
494 pub fn release_task_locks(&self, task_id: &str) -> Result<i32> {
497 let now = now_ms();
498
499 self.with_conn(|conn| {
500 let files_to_release: Vec<(String, String)> = {
502 let mut stmt =
503 conn.prepare("SELECT file_path, worker_id FROM file_locks WHERE task_id = ?1")?;
504 stmt.query_map(params![task_id], |row| {
505 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
506 })?
507 .filter_map(|r| r.ok())
508 .collect()
509 };
510
511 for (file_path, worker_id) in &files_to_release {
513 conn.execute(
514 "UPDATE claim_sequence SET end_timestamp = ?1
515 WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
516 params![now, file_path, worker_id],
517 )?;
518
519 conn.execute(
521 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
522 VALUES (?1, ?2, 'released', 'task completed', ?3)",
523 params![file_path, worker_id, now],
524 )?;
525 }
526
527 let deleted = conn.execute(
528 "DELETE FROM file_locks WHERE task_id = ?1",
529 params![task_id],
530 )?;
531
532 Ok(deleted as i32)
533 })
534 }
535}