1use super::{Database, now_ms};
10use crate::types::{ClaimEvent, ClaimEventType, ClaimUpdates, FileLock};
11use anyhow::Result;
12use rusqlite::params;
13use std::collections::{HashMap, HashSet};
14
15pub enum ExclusiveLockResult {
17 Acquired,
19 AlreadyHeldBySelf,
21 HeldByOther(String),
23}
24
25impl Database {
26 pub fn lock_file_exclusive(
36 &self,
37 file_path: String,
38 worker_id: &str,
39 reason: Option<String>,
40 task_id: Option<String>,
41 ) -> Result<ExclusiveLockResult> {
42 let now = now_ms();
43
44 self.with_conn_mut(|conn| {
45 let tx = conn.transaction()?;
46
47 let existing: Option<String> = tx
49 .query_row(
50 "SELECT worker_id FROM file_locks WHERE file_path = ?1",
51 params![&file_path],
52 |row| row.get(0),
53 )
54 .ok();
55
56 let result = if let Some(existing_worker) = existing {
57 if existing_worker != worker_id {
58 ExclusiveLockResult::HeldByOther(existing_worker)
60 } else {
61 tx.execute(
63 "UPDATE file_locks SET locked_at = ?1, reason = ?2, task_id = ?3 WHERE file_path = ?4",
64 params![now, &reason, &task_id, &file_path],
65 )?;
66 ExclusiveLockResult::AlreadyHeldBySelf
67 }
68 } else {
69 tx.execute(
71 "INSERT INTO file_locks (file_path, worker_id, reason, locked_at, task_id) VALUES (?1, ?2, ?3, ?4, ?5)",
72 params![&file_path, worker_id, &reason, now, &task_id],
73 )?;
74
75 tx.execute(
77 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp) VALUES (?1, ?2, 'claimed', ?3, ?4)",
78 params![&file_path, worker_id, &reason, now],
79 )?;
80 ExclusiveLockResult::Acquired
81 };
82
83 tx.commit()?;
84 Ok(result)
85 })
86 }
87
88 pub fn lock_file(
91 &self,
92 file_path: String,
93 worker_id: &str,
94 reason: Option<String>,
95 task_id: Option<String>,
96 ) -> Result<Option<String>> {
97 let now = now_ms();
98
99 self.with_conn_mut(|conn| {
100 let tx = conn.transaction()?;
101 let existing: Option<String> = tx
103 .query_row(
104 "SELECT worker_id FROM file_locks WHERE file_path = ?1",
105 params![&file_path],
106 |row| row.get(0),
107 )
108 .ok();
109
110 let result = if let Some(existing_worker) = existing {
111 if existing_worker != worker_id {
112 Some(existing_worker)
114 } else {
115 tx.execute(
117 "UPDATE file_locks SET locked_at = ?1, reason = ?2, task_id = ?3 WHERE file_path = ?4",
118 params![now, &reason, &task_id, &file_path],
119 )?;
120 None
121 }
122 } else {
123 tx.execute(
125 "INSERT INTO file_locks (file_path, worker_id, reason, locked_at, task_id) VALUES (?1, ?2, ?3, ?4, ?5)",
126 params![&file_path, worker_id, &reason, now, &task_id],
127 )?;
128
129 tx.execute(
131 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp) VALUES (?1, ?2, 'claimed', ?3, ?4)",
132 params![&file_path, worker_id, &reason, now],
133 )?;
134 None
135 };
136
137 tx.commit()?;
138 Ok(result)
139 })
140 }
141
142 pub fn unlock_file(
144 &self,
145 file_path: &str,
146 worker_id: &str,
147 reason: Option<String>,
148 ) -> Result<bool> {
149 let now = now_ms();
150
151 self.with_conn_mut(|conn| {
152 let tx = conn.transaction()?;
153
154 let deleted = tx.execute(
155 "DELETE FROM file_locks WHERE file_path = ?1 AND worker_id = ?2",
156 params![file_path, worker_id],
157 )?;
158
159 if deleted > 0 {
160 let claim_id: Option<i64> = tx.query_row(
162 "SELECT MAX(id) FROM claim_sequence
163 WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
164 params![file_path, worker_id],
165 |row| row.get(0),
166 ).ok().flatten();
167
168 tx.execute(
170 "UPDATE claim_sequence SET end_timestamp = ?1
171 WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
172 params![now, file_path, worker_id],
173 )?;
174
175 tx.execute(
177 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
178 VALUES (?1, ?2, 'released', ?3, ?4, ?5)",
179 params![file_path, worker_id, &reason, now, claim_id],
180 )?;
181 }
182
183 tx.commit()?;
184 Ok(deleted > 0)
185 })
186 }
187
188 pub fn unlock_files_verbose(
191 &self,
192 file_paths: Vec<String>,
193 worker_id: &str,
194 reason: Option<String>,
195 ) -> Result<Vec<(String, String)>> {
196 let now = now_ms();
197 let mut released = Vec::new();
198
199 self.with_conn_mut(|conn| {
200 let tx = conn.transaction()?;
201
202 for file_path in file_paths {
203 let deleted = tx.execute(
204 "DELETE FROM file_locks WHERE file_path = ?1 AND worker_id = ?2",
205 params![&file_path, worker_id],
206 )?;
207
208 if deleted > 0 {
209 let claim_id: Option<i64> = tx.query_row(
211 "SELECT MAX(id) FROM claim_sequence
212 WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
213 params![&file_path, worker_id],
214 |row| row.get(0),
215 ).ok().flatten();
216
217 tx.execute(
219 "UPDATE claim_sequence SET end_timestamp = ?1
220 WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
221 params![now, &file_path, worker_id],
222 )?;
223
224 tx.execute(
226 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
227 VALUES (?1, ?2, 'released', ?3, ?4, ?5)",
228 params![&file_path, worker_id, &reason, now, claim_id],
229 )?;
230
231 released.push((file_path, worker_id.to_string()));
232 }
233 }
234
235 tx.commit()?;
236 Ok(released)
237 })
238 }
239
240 pub fn release_worker_locks_verbose(
243 &self,
244 worker_id: &str,
245 reason: Option<String>,
246 ) -> Result<Vec<(String, String)>> {
247 let now = now_ms();
248
249 self.with_conn_mut(|conn| {
250 let tx = conn.transaction()?;
251
252 let files_to_release: Vec<String> = {
254 let mut stmt =
255 tx.prepare("SELECT file_path FROM file_locks WHERE worker_id = ?1")?;
256 stmt.query_map(params![worker_id], |row| row.get::<_, String>(0))?
257 .filter_map(|r| r.ok())
258 .collect()
259 };
260
261 if files_to_release.is_empty() {
262 tx.commit()?;
263 return Ok(Vec::new());
264 }
265
266 tx.execute(
268 "UPDATE claim_sequence SET end_timestamp = ?1
269 WHERE worker_id = ?2 AND end_timestamp IS NULL",
270 params![now, worker_id],
271 )?;
272
273 for file_path in &files_to_release {
275 tx.execute(
276 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
277 VALUES (?1, ?2, 'released', ?3, ?4)",
278 params![file_path, worker_id, &reason, now],
279 )?;
280 }
281
282 tx.execute(
284 "DELETE FROM file_locks WHERE worker_id = ?1",
285 params![worker_id],
286 )?;
287
288 tx.commit()?;
289
290 let released: Vec<(String, String)> = files_to_release
291 .into_iter()
292 .map(|f| (f, worker_id.to_string()))
293 .collect();
294
295 Ok(released)
296 })
297 }
298
299 pub fn release_task_locks_verbose(
302 &self,
303 task_id: &str,
304 reason: Option<String>,
305 ) -> Result<Vec<(String, String)>> {
306 let now = now_ms();
307
308 self.with_conn_mut(|conn| {
309 let tx = conn.transaction()?;
310
311 let files_to_release: Vec<(String, String)> = {
313 let mut stmt =
314 tx.prepare("SELECT file_path, worker_id FROM file_locks WHERE task_id = ?1")?;
315 stmt.query_map(params![task_id], |row| {
316 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
317 })?
318 .filter_map(|r| r.ok())
319 .collect()
320 };
321
322 if files_to_release.is_empty() {
323 tx.commit()?;
324 return Ok(Vec::new());
325 }
326
327 for (file_path, worker_id) in &files_to_release {
329 tx.execute(
330 "UPDATE claim_sequence SET end_timestamp = ?1
331 WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
332 params![now, file_path, worker_id],
333 )?;
334
335 let reason_str = reason.as_deref().unwrap_or("task release");
337 tx.execute(
338 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
339 VALUES (?1, ?2, 'released', ?3, ?4)",
340 params![file_path, worker_id, reason_str, now],
341 )?;
342 }
343
344 tx.execute(
346 "DELETE FROM file_locks WHERE task_id = ?1",
347 params![task_id],
348 )?;
349
350 tx.commit()?;
351 Ok(files_to_release)
352 })
353 }
354
355 pub fn claim_updates(&self, worker_id: &str) -> Result<ClaimUpdates> {
358 self.with_conn(|conn| {
359 let last_seq: i64 = conn
361 .query_row(
362 "SELECT last_claim_sequence FROM workers WHERE id = ?1",
363 params![worker_id],
364 |row| row.get(0),
365 )
366 .unwrap_or(0);
367
368 let mut stmt = conn.prepare(
371 "SELECT id, file_path, worker_id, event, reason, timestamp, end_timestamp, claim_id
372 FROM claim_sequence
373 WHERE id >= ?1
374 ORDER BY id",
375 )?;
376 let events: Vec<ClaimEvent> = stmt
377 .query_map(params![last_seq], |row| {
378 Ok(ClaimEvent {
379 id: row.get(0)?,
380 file_path: row.get(1)?,
381 worker_id: row.get(2)?,
382 event: ClaimEventType::parse(&row.get::<_, String>(3)?)
383 .unwrap_or(ClaimEventType::Claimed),
384 reason: row.get(4)?,
385 timestamp: row.get(5)?,
386 end_timestamp: row.get(6)?,
387 claim_id: row.get(7)?,
388 })
389 })?
390 .filter_map(|r| r.ok())
391 .collect();
392
393 let max_seen = events.iter().map(|e| e.id).max();
396 let new_seq = match max_seen {
397 Some(max) => max + 1, None => last_seq, };
400
401 if new_seq > last_seq {
403 conn.execute(
404 "UPDATE workers SET last_claim_sequence = ?1 WHERE id = ?2",
405 params![new_seq, worker_id],
406 )?;
407 }
408
409 let new_claims: Vec<ClaimEvent> = events
411 .iter()
412 .filter(|e| e.event == ClaimEventType::Claimed)
413 .cloned()
414 .collect();
415
416 let new_claim_ids: HashSet<i64> = new_claims.iter().map(|c| c.id).collect();
420
421 let dropped_claims: Vec<ClaimEvent> = events
422 .iter()
423 .filter(|e| e.event == ClaimEventType::Released)
424 .filter(|release| {
425 match release.claim_id {
426 Some(cid) => cid < last_seq || new_claim_ids.contains(&cid),
427 None => true, }
429 })
430 .cloned()
431 .collect();
432
433 Ok(ClaimUpdates {
434 new_claims,
435 dropped_claims,
436 sequence: new_seq,
437 })
438 })
439 }
440
441 pub fn get_file_locks(
443 &self,
444 file_paths: Option<Vec<String>>,
445 agent_id: Option<&str>,
446 task_id: Option<&str>,
447 ) -> Result<HashMap<String, FileLock>> {
448 self.with_conn(|conn| {
449 let locks = if let Some(paths) = file_paths {
450 if paths.is_empty() {
451 return Ok(HashMap::new());
452 }
453
454 let placeholders: Vec<String> = paths.iter().map(|_| "?".to_string()).collect();
455 let sql = format!(
456 "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE file_path IN ({})",
457 placeholders.join(", ")
458 );
459
460 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
461 for path in &paths {
462 params_vec.push(Box::new(path.clone()));
463 }
464
465 let params_refs: Vec<&dyn rusqlite::ToSql> =
466 params_vec.iter().map(|b| b.as_ref()).collect();
467
468 let mut stmt = conn.prepare(&sql)?;
469 stmt.query_map(params_refs.as_slice(), |row| {
470 let file_path: String = row.get(0)?;
471 Ok((file_path.clone(), FileLock {
472 file_path,
473 worker_id: row.get(1)?,
474 reason: row.get(2)?,
475 locked_at: row.get(3)?,
476 task_id: row.get(4)?,
477 }))
478 })?
479 .filter_map(|r| r.ok())
480 .collect()
481 } else if let Some(aid) = agent_id {
482 let mut stmt = conn.prepare(
483 "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE worker_id = ?1",
484 )?;
485 stmt.query_map(params![aid], |row| {
486 let file_path: String = row.get(0)?;
487 Ok((file_path.clone(), FileLock {
488 file_path,
489 worker_id: row.get(1)?,
490 reason: row.get(2)?,
491 locked_at: row.get(3)?,
492 task_id: row.get(4)?,
493 }))
494 })?
495 .filter_map(|r| r.ok())
496 .collect()
497 } else if let Some(tid) = task_id {
498 let mut stmt = conn.prepare(
499 "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE task_id = ?1",
500 )?;
501 stmt.query_map(params![tid], |row| {
502 let file_path: String = row.get(0)?;
503 Ok((file_path.clone(), FileLock {
504 file_path,
505 worker_id: row.get(1)?,
506 reason: row.get(2)?,
507 locked_at: row.get(3)?,
508 task_id: row.get(4)?,
509 }))
510 })?
511 .filter_map(|r| r.ok())
512 .collect()
513 } else {
514 HashMap::new()
516 };
517
518 Ok(locks)
519 })
520 }
521
522 pub fn get_all_file_locks(&self) -> Result<Vec<FileLock>> {
524 self.with_conn(|conn| {
525 let mut stmt = conn.prepare(
526 "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks",
527 )?;
528
529 let locks = stmt
530 .query_map([], |row| {
531 let file_path: String = row.get(0)?;
532 let worker_id: String = row.get(1)?;
533 let reason: Option<String> = row.get(2)?;
534 let locked_at: i64 = row.get(3)?;
535 let task_id: Option<String> = row.get(4)?;
536 Ok(FileLock {
537 file_path,
538 worker_id,
539 reason,
540 locked_at,
541 task_id,
542 })
543 })?
544 .filter_map(|r| r.ok())
545 .collect();
546
547 Ok(locks)
548 })
549 }
550
551 pub fn find_file_contention(
565 &self,
566 task_id: &str,
567 worker_id: &str,
568 ) -> Result<Vec<(String, Option<String>, String)>> {
569 self.with_conn(|conn| {
570 let parent_task_id: Option<String> = conn
575 .query_row(
576 "SELECT from_task_id FROM dependencies
577 WHERE to_task_id = ?1 AND dep_type = 'contains'
578 LIMIT 1",
579 params![task_id],
580 |row| row.get(0),
581 )
582 .ok();
583
584 let results = if let Some(parent_id) = parent_task_id {
585 let mut stmt = conn.prepare(
587 "SELECT fl.file_path, fl.task_id, fl.worker_id
588 FROM file_locks fl
589 WHERE fl.worker_id != ?1
590 AND fl.file_path NOT LIKE 'lock:%'
591 AND fl.worker_id IN (
592 SELECT t.worker_id FROM tasks t
593 INNER JOIN dependencies d ON d.to_task_id = t.id
594 WHERE d.from_task_id = ?2 AND d.dep_type = 'contains'
595 AND t.status IN ('working', 'assigned')
596 AND t.worker_id IS NOT NULL
597 )
598 ORDER BY fl.file_path",
599 )?;
600 stmt.query_map(params![worker_id, &parent_id], |row| {
601 Ok((
602 row.get::<_, String>(0)?,
603 row.get::<_, Option<String>>(1)?,
604 row.get::<_, String>(2)?,
605 ))
606 })?
607 .filter_map(|r| r.ok())
608 .collect()
609 } else {
610 let mut stmt = conn.prepare(
612 "SELECT fl.file_path, fl.task_id, fl.worker_id
613 FROM file_locks fl
614 WHERE fl.worker_id != ?1
615 AND fl.file_path NOT LIKE 'lock:%'
616 AND EXISTS (
617 SELECT 1 FROM tasks t
618 WHERE t.worker_id = fl.worker_id
619 AND t.status IN ('working', 'assigned')
620 )
621 ORDER BY fl.file_path",
622 )?;
623 stmt.query_map(params![worker_id], |row| {
624 Ok((
625 row.get::<_, String>(0)?,
626 row.get::<_, Option<String>>(1)?,
627 row.get::<_, String>(2)?,
628 ))
629 })?
630 .filter_map(|r| r.ok())
631 .collect()
632 };
633
634 Ok(results)
635 })
636 }
637
638 pub fn release_worker_locks(&self, worker_id: &str) -> Result<i32> {
640 let now = now_ms();
641
642 self.with_conn(|conn| {
643 conn.execute(
645 "UPDATE claim_sequence SET end_timestamp = ?1
646 WHERE worker_id = ?2 AND end_timestamp IS NULL",
647 params![now, worker_id],
648 )?;
649
650 let deleted = conn.execute(
651 "DELETE FROM file_locks WHERE worker_id = ?1",
652 params![worker_id],
653 )?;
654
655 Ok(deleted as i32)
656 })
657 }
658
659 pub fn release_task_locks(&self, task_id: &str) -> Result<i32> {
662 let now = now_ms();
663
664 self.with_conn(|conn| {
665 let files_to_release: Vec<(String, String)> = {
667 let mut stmt =
668 conn.prepare("SELECT file_path, worker_id FROM file_locks WHERE task_id = ?1")?;
669 stmt.query_map(params![task_id], |row| {
670 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
671 })?
672 .filter_map(|r| r.ok())
673 .collect()
674 };
675
676 for (file_path, worker_id) in &files_to_release {
678 conn.execute(
679 "UPDATE claim_sequence SET end_timestamp = ?1
680 WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
681 params![now, file_path, worker_id],
682 )?;
683
684 conn.execute(
686 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
687 VALUES (?1, ?2, 'released', 'task completed', ?3)",
688 params![file_path, worker_id, now],
689 )?;
690 }
691
692 let deleted = conn.execute(
693 "DELETE FROM file_locks WHERE task_id = ?1",
694 params![task_id],
695 )?;
696
697 Ok(deleted as i32)
698 })
699 }
700}