Skip to main content

task_graph_mcp/db/
locks.rs

1//! File lock operations (advisory and exclusive) and claim tracking.
2//!
3//! Two lock modes are supported:
4//! - **Advisory marks** (default): `mark_file("src/main.rs")` - warns if another agent
5//!   holds the mark, but allows it.
6//! - **Exclusive locks** (`lock:` prefix): `mark_file("lock:git-commit")` - rejects with
7//!   an error if another agent holds the lock. Used for mutual exclusion on shared resources.
8
9use super::{Database, now_ms};
10use crate::types::{ClaimEvent, ClaimEventType, ClaimUpdates, FileLock};
11use anyhow::Result;
12use rusqlite::params;
13use std::collections::{HashMap, HashSet};
14
15/// Result of an exclusive lock attempt.
16pub enum ExclusiveLockResult {
17    /// Lock acquired successfully.
18    Acquired,
19    /// Lock already held by this agent (refreshed).
20    AlreadyHeldBySelf,
21    /// Lock held by another agent - cannot acquire.
22    HeldByOther(String),
23}
24
25impl Database {
26    /// Acquire an exclusive lock on a resource.
27    ///
28    /// Unlike advisory `lock_file`, this method enforces mutual exclusion:
29    /// - If the lock is free, it is acquired.
30    /// - If the lock is already held by the same agent, it is refreshed (timestamp/reason updated).
31    /// - If the lock is held by another agent, returns `HeldByOther(agent_id)`.
32    ///
33    /// The `file_path` parameter stores the full `lock:resource` string for consistent
34    /// storage in the file_locks table.
35    pub fn lock_file_exclusive(
36        &self,
37        file_path: String,
38        worker_id: &str,
39        reason: Option<String>,
40        task_id: Option<String>,
41    ) -> Result<ExclusiveLockResult> {
42        let now = now_ms();
43
44        self.with_conn_mut(|conn| {
45            let tx = conn.transaction()?;
46
47            // Check if already locked
48            let existing: Option<String> = tx
49                .query_row(
50                    "SELECT worker_id FROM file_locks WHERE file_path = ?1",
51                    params![&file_path],
52                    |row| row.get(0),
53                )
54                .ok();
55
56            let result = if let Some(existing_worker) = existing {
57                if existing_worker != worker_id {
58                    // Locked by another worker - exclusive rejection
59                    ExclusiveLockResult::HeldByOther(existing_worker)
60                } else {
61                    // Already locked by this worker - refresh timestamp, reason, and task_id
62                    tx.execute(
63                        "UPDATE file_locks SET locked_at = ?1, reason = ?2, task_id = ?3 WHERE file_path = ?4",
64                        params![now, &reason, &task_id, &file_path],
65                    )?;
66                    ExclusiveLockResult::AlreadyHeldBySelf
67                }
68            } else {
69                // Not locked - create new lock
70                tx.execute(
71                    "INSERT INTO file_locks (file_path, worker_id, reason, locked_at, task_id) VALUES (?1, ?2, ?3, ?4, ?5)",
72                    params![&file_path, worker_id, &reason, now, &task_id],
73                )?;
74
75                // Record claim event for tracking
76                tx.execute(
77                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp) VALUES (?1, ?2, 'claimed', ?3, ?4)",
78                    params![&file_path, worker_id, &reason, now],
79                )?;
80                ExclusiveLockResult::Acquired
81            };
82
83            tx.commit()?;
84            Ok(result)
85        })
86    }
87
88    /// Lock a file (advisory).
89    /// Returns Ok with optional warning if already locked by another worker.
90    pub fn lock_file(
91        &self,
92        file_path: String,
93        worker_id: &str,
94        reason: Option<String>,
95        task_id: Option<String>,
96    ) -> Result<Option<String>> {
97        let now = now_ms();
98
99        self.with_conn_mut(|conn| {
100            let tx = conn.transaction()?;
101            // Check if already locked
102            let existing: Option<String> = tx
103                .query_row(
104                    "SELECT worker_id FROM file_locks WHERE file_path = ?1",
105                    params![&file_path],
106                    |row| row.get(0),
107                )
108                .ok();
109
110            let result = if let Some(existing_worker) = existing {
111                if existing_worker != worker_id {
112                    // Locked by another worker - return warning
113                    Some(existing_worker)
114                } else {
115                    // Already locked by this worker - just update timestamp, reason, and task_id
116                    tx.execute(
117                        "UPDATE file_locks SET locked_at = ?1, reason = ?2, task_id = ?3 WHERE file_path = ?4",
118                        params![now, &reason, &task_id, &file_path],
119                    )?;
120                    None
121                }
122            } else {
123                // Not locked - create new lock
124                tx.execute(
125                    "INSERT INTO file_locks (file_path, worker_id, reason, locked_at, task_id) VALUES (?1, ?2, ?3, ?4, ?5)",
126                    params![&file_path, worker_id, &reason, now, &task_id],
127                )?;
128
129                // Record claim event for tracking
130                tx.execute(
131                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp) VALUES (?1, ?2, 'claimed', ?3, ?4)",
132                    params![&file_path, worker_id, &reason, now],
133                )?;
134                None
135            };
136
137            tx.commit()?;
138            Ok(result)
139        })
140    }
141
142    /// Unlock a file with optional reason for next claimant.
143    pub fn unlock_file(
144        &self,
145        file_path: &str,
146        worker_id: &str,
147        reason: Option<String>,
148    ) -> Result<bool> {
149        let now = now_ms();
150
151        self.with_conn_mut(|conn| {
152            let tx = conn.transaction()?;
153
154            let deleted = tx.execute(
155                "DELETE FROM file_locks WHERE file_path = ?1 AND worker_id = ?2",
156                params![file_path, worker_id],
157            )?;
158
159            if deleted > 0 {
160                // Find the claim_id for this file+worker (most recent claim)
161                let claim_id: Option<i64> = tx.query_row(
162                    "SELECT MAX(id) FROM claim_sequence
163                     WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
164                    params![file_path, worker_id],
165                    |row| row.get(0),
166                ).ok().flatten();
167
168                // Close any open claim for this file+worker
169                tx.execute(
170                    "UPDATE claim_sequence SET end_timestamp = ?1
171                     WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
172                    params![now, file_path, worker_id],
173                )?;
174
175                // Record release event with claim_id reference
176                tx.execute(
177                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
178                     VALUES (?1, ?2, 'released', ?3, ?4, ?5)",
179                    params![file_path, worker_id, &reason, now, claim_id],
180                )?;
181            }
182
183            tx.commit()?;
184            Ok(deleted > 0)
185        })
186    }
187
188    /// Unlock multiple files with verbose return.
189    /// Returns a list of (file_path, worker_id) pairs for files that were actually released.
190    pub fn unlock_files_verbose(
191        &self,
192        file_paths: Vec<String>,
193        worker_id: &str,
194        reason: Option<String>,
195    ) -> Result<Vec<(String, String)>> {
196        let now = now_ms();
197        let mut released = Vec::new();
198
199        self.with_conn_mut(|conn| {
200            let tx = conn.transaction()?;
201
202            for file_path in file_paths {
203                let deleted = tx.execute(
204                    "DELETE FROM file_locks WHERE file_path = ?1 AND worker_id = ?2",
205                    params![&file_path, worker_id],
206                )?;
207
208                if deleted > 0 {
209                    // Find the claim_id for this file+worker (most recent claim)
210                    let claim_id: Option<i64> = tx.query_row(
211                        "SELECT MAX(id) FROM claim_sequence
212                         WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
213                        params![&file_path, worker_id],
214                        |row| row.get(0),
215                    ).ok().flatten();
216
217                    // Close any open claim for this file+worker
218                    tx.execute(
219                        "UPDATE claim_sequence SET end_timestamp = ?1
220                         WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
221                        params![now, &file_path, worker_id],
222                    )?;
223
224                    // Record release event with claim_id reference
225                    tx.execute(
226                        "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
227                         VALUES (?1, ?2, 'released', ?3, ?4, ?5)",
228                        params![&file_path, worker_id, &reason, now, claim_id],
229                    )?;
230
231                    released.push((file_path, worker_id.to_string()));
232                }
233            }
234
235            tx.commit()?;
236            Ok(released)
237        })
238    }
239
240    /// Release all files held by a worker with verbose return.
241    /// Returns a list of (file_path, worker_id) pairs for files that were released.
242    pub fn release_worker_locks_verbose(
243        &self,
244        worker_id: &str,
245        reason: Option<String>,
246    ) -> Result<Vec<(String, String)>> {
247        let now = now_ms();
248
249        self.with_conn_mut(|conn| {
250            let tx = conn.transaction()?;
251
252            // Get files locked by this worker before deleting
253            let files_to_release: Vec<String> = {
254                let mut stmt =
255                    tx.prepare("SELECT file_path FROM file_locks WHERE worker_id = ?1")?;
256                stmt.query_map(params![worker_id], |row| row.get::<_, String>(0))?
257                    .filter_map(|r| r.ok())
258                    .collect()
259            };
260
261            if files_to_release.is_empty() {
262                tx.commit()?;
263                return Ok(Vec::new());
264            }
265
266            // Close any open claims for this worker
267            tx.execute(
268                "UPDATE claim_sequence SET end_timestamp = ?1
269                 WHERE worker_id = ?2 AND end_timestamp IS NULL",
270                params![now, worker_id],
271            )?;
272
273            // Record release events for each file
274            for file_path in &files_to_release {
275                tx.execute(
276                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
277                     VALUES (?1, ?2, 'released', ?3, ?4)",
278                    params![file_path, worker_id, &reason, now],
279                )?;
280            }
281
282            // Delete the locks
283            tx.execute(
284                "DELETE FROM file_locks WHERE worker_id = ?1",
285                params![worker_id],
286            )?;
287
288            tx.commit()?;
289
290            let released: Vec<(String, String)> = files_to_release
291                .into_iter()
292                .map(|f| (f, worker_id.to_string()))
293                .collect();
294
295            Ok(released)
296        })
297    }
298
299    /// Release all files associated with a task with verbose return.
300    /// Returns a list of (file_path, worker_id) pairs for files that were released.
301    pub fn release_task_locks_verbose(
302        &self,
303        task_id: &str,
304        reason: Option<String>,
305    ) -> Result<Vec<(String, String)>> {
306        let now = now_ms();
307
308        self.with_conn_mut(|conn| {
309            let tx = conn.transaction()?;
310
311            // Get files locked by this task before deleting
312            let files_to_release: Vec<(String, String)> = {
313                let mut stmt =
314                    tx.prepare("SELECT file_path, worker_id FROM file_locks WHERE task_id = ?1")?;
315                stmt.query_map(params![task_id], |row| {
316                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
317                })?
318                .filter_map(|r| r.ok())
319                .collect()
320            };
321
322            if files_to_release.is_empty() {
323                tx.commit()?;
324                return Ok(Vec::new());
325            }
326
327            // Close any open claims for these files
328            for (file_path, worker_id) in &files_to_release {
329                tx.execute(
330                    "UPDATE claim_sequence SET end_timestamp = ?1
331                     WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
332                    params![now, file_path, worker_id],
333                )?;
334
335                // Record release event
336                let reason_str = reason.as_deref().unwrap_or("task release");
337                tx.execute(
338                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
339                     VALUES (?1, ?2, 'released', ?3, ?4)",
340                    params![file_path, worker_id, reason_str, now],
341                )?;
342            }
343
344            // Delete the locks
345            tx.execute(
346                "DELETE FROM file_locks WHERE task_id = ?1",
347                params![task_id],
348            )?;
349
350            tx.commit()?;
351            Ok(files_to_release)
352        })
353    }
354
355    /// Get claim updates since worker's last poll.
356    /// Returns all claim/release events since the agent's last poll position.
357    pub fn claim_updates(&self, worker_id: &str) -> Result<ClaimUpdates> {
358        self.with_conn(|conn| {
359            // Get worker's last sequence
360            let last_seq: i64 = conn
361                .query_row(
362                    "SELECT last_claim_sequence FROM workers WHERE id = ?1",
363                    params![worker_id],
364                    |row| row.get(0),
365                )
366                .unwrap_or(0);
367
368            // Get all new events since last sequence.
369            // We use >= because last_seq now represents "next event to fetch" (set to max+1 after each poll).
370            let mut stmt = conn.prepare(
371                "SELECT id, file_path, worker_id, event, reason, timestamp, end_timestamp, claim_id
372                 FROM claim_sequence
373                 WHERE id >= ?1
374                 ORDER BY id",
375            )?;
376            let events: Vec<ClaimEvent> = stmt
377                .query_map(params![last_seq], |row| {
378                    Ok(ClaimEvent {
379                        id: row.get(0)?,
380                        file_path: row.get(1)?,
381                        worker_id: row.get(2)?,
382                        event: ClaimEventType::parse(&row.get::<_, String>(3)?)
383                            .unwrap_or(ClaimEventType::Claimed),
384                        reason: row.get(4)?,
385                        timestamp: row.get(5)?,
386                        end_timestamp: row.get(6)?,
387                        claim_id: row.get(7)?,
388                    })
389                })?
390                .filter_map(|r| r.ok())
391                .collect();
392
393            // Find max sequence from events. After polling, we set last_seq = max + 1
394            // so that claims we just saw have claim_id < last_seq (for release filtering).
395            let max_seen = events.iter().map(|e| e.id).max();
396            let new_seq = match max_seen {
397                Some(max) => max + 1, // +1 so claims just polled satisfy claim_id < new_seq
398                None => last_seq,     // No events, keep current sequence
399            };
400
401            // Update worker's last sequence
402            if new_seq > last_seq {
403                conn.execute(
404                    "UPDATE workers SET last_claim_sequence = ?1 WHERE id = ?2",
405                    params![new_seq, worker_id],
406                )?;
407            }
408
409            // Separate into claims and releases
410            let new_claims: Vec<ClaimEvent> = events
411                .iter()
412                .filter(|e| e.event == ClaimEventType::Claimed)
413                .cloned()
414                .collect();
415
416            // For releases, only include if agent has polled and received the original claim.
417            // Include if claim_id < last_seq (strictly less - was in a previous poll, and
418            // last_seq is max+1 after each poll) OR claim_id is in current batch.
419            let new_claim_ids: HashSet<i64> = new_claims.iter().map(|c| c.id).collect();
420
421            let dropped_claims: Vec<ClaimEvent> = events
422                .iter()
423                .filter(|e| e.event == ClaimEventType::Released)
424                .filter(|release| {
425                    match release.claim_id {
426                        Some(cid) => cid < last_seq || new_claim_ids.contains(&cid),
427                        None => true, // Legacy releases without claim_id - include them
428                    }
429                })
430                .cloned()
431                .collect();
432
433            Ok(ClaimUpdates {
434                new_claims,
435                dropped_claims,
436                sequence: new_seq,
437            })
438        })
439    }
440
441    /// Get file locks with full details.
442    pub fn get_file_locks(
443        &self,
444        file_paths: Option<Vec<String>>,
445        agent_id: Option<&str>,
446        task_id: Option<&str>,
447    ) -> Result<HashMap<String, FileLock>> {
448        self.with_conn(|conn| {
449            let locks = if let Some(paths) = file_paths {
450                if paths.is_empty() {
451                    return Ok(HashMap::new());
452                }
453
454                let placeholders: Vec<String> = paths.iter().map(|_| "?".to_string()).collect();
455                let sql = format!(
456                    "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE file_path IN ({})",
457                    placeholders.join(", ")
458                );
459
460                let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
461                for path in &paths {
462                    params_vec.push(Box::new(path.clone()));
463                }
464
465                let params_refs: Vec<&dyn rusqlite::ToSql> =
466                    params_vec.iter().map(|b| b.as_ref()).collect();
467
468                let mut stmt = conn.prepare(&sql)?;
469                stmt.query_map(params_refs.as_slice(), |row| {
470                    let file_path: String = row.get(0)?;
471                    Ok((file_path.clone(), FileLock {
472                        file_path,
473                        worker_id: row.get(1)?,
474                        reason: row.get(2)?,
475                        locked_at: row.get(3)?,
476                        task_id: row.get(4)?,
477                    }))
478                })?
479                .filter_map(|r| r.ok())
480                .collect()
481            } else if let Some(aid) = agent_id {
482                let mut stmt = conn.prepare(
483                    "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE worker_id = ?1",
484                )?;
485                stmt.query_map(params![aid], |row| {
486                    let file_path: String = row.get(0)?;
487                    Ok((file_path.clone(), FileLock {
488                        file_path,
489                        worker_id: row.get(1)?,
490                        reason: row.get(2)?,
491                        locked_at: row.get(3)?,
492                        task_id: row.get(4)?,
493                    }))
494                })?
495                .filter_map(|r| r.ok())
496                .collect()
497            } else if let Some(tid) = task_id {
498                let mut stmt = conn.prepare(
499                    "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE task_id = ?1",
500                )?;
501                stmt.query_map(params![tid], |row| {
502                    let file_path: String = row.get(0)?;
503                    Ok((file_path.clone(), FileLock {
504                        file_path,
505                        worker_id: row.get(1)?,
506                        reason: row.get(2)?,
507                        locked_at: row.get(3)?,
508                        task_id: row.get(4)?,
509                    }))
510                })?
511                .filter_map(|r| r.ok())
512                .collect()
513            } else {
514                // Return empty - we now require at least one filter
515                HashMap::new()
516            };
517
518            Ok(locks)
519        })
520    }
521
522    /// Get all file locks as FileLock objects.
523    pub fn get_all_file_locks(&self) -> Result<Vec<FileLock>> {
524        self.with_conn(|conn| {
525            let mut stmt = conn.prepare(
526                "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks",
527            )?;
528
529            let locks = stmt
530                .query_map([], |row| {
531                    let file_path: String = row.get(0)?;
532                    let worker_id: String = row.get(1)?;
533                    let reason: Option<String> = row.get(2)?;
534                    let locked_at: i64 = row.get(3)?;
535                    let task_id: Option<String> = row.get(4)?;
536                    Ok(FileLock {
537                        file_path,
538                        worker_id,
539                        reason,
540                        locked_at,
541                        task_id,
542                    })
543                })?
544                .filter_map(|r| r.ok())
545                .collect();
546
547            Ok(locks)
548        })
549    }
550
551    /// Release all locks held by a worker.
552    pub fn release_worker_locks(&self, worker_id: &str) -> Result<i32> {
553        let now = now_ms();
554
555        self.with_conn(|conn| {
556            // Close any open claims for this worker
557            conn.execute(
558                "UPDATE claim_sequence SET end_timestamp = ?1
559                 WHERE worker_id = ?2 AND end_timestamp IS NULL",
560                params![now, worker_id],
561            )?;
562
563            let deleted = conn.execute(
564                "DELETE FROM file_locks WHERE worker_id = ?1",
565                params![worker_id],
566            )?;
567
568            Ok(deleted as i32)
569        })
570    }
571
572    /// Release all locks associated with a task.
573    /// Called automatically when a task completes.
574    pub fn release_task_locks(&self, task_id: &str) -> Result<i32> {
575        let now = now_ms();
576
577        self.with_conn(|conn| {
578            // Get files locked by this task before deleting
579            let files_to_release: Vec<(String, String)> = {
580                let mut stmt =
581                    conn.prepare("SELECT file_path, worker_id FROM file_locks WHERE task_id = ?1")?;
582                stmt.query_map(params![task_id], |row| {
583                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
584                })?
585                .filter_map(|r| r.ok())
586                .collect()
587            };
588
589            // Close any open claims for these files
590            for (file_path, worker_id) in &files_to_release {
591                conn.execute(
592                    "UPDATE claim_sequence SET end_timestamp = ?1
593                     WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
594                    params![now, file_path, worker_id],
595                )?;
596
597                // Record release event
598                conn.execute(
599                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
600                     VALUES (?1, ?2, 'released', 'task completed', ?3)",
601                    params![file_path, worker_id, now],
602                )?;
603            }
604
605            let deleted = conn.execute(
606                "DELETE FROM file_locks WHERE task_id = ?1",
607                params![task_id],
608            )?;
609
610            Ok(deleted as i32)
611        })
612    }
613}