Skip to main content

task_graph_mcp/db/
locks.rs

1//! File lock operations (advisory) and claim tracking.
2
3use super::{Database, now_ms};
4use crate::types::{ClaimEvent, ClaimEventType, ClaimUpdates, FileLock};
5use anyhow::Result;
6use rusqlite::params;
7use std::collections::{HashMap, HashSet};
8
9impl Database {
10    /// Lock a file (advisory).
11    /// Returns Ok with optional warning if already locked by another worker.
12    pub fn lock_file(
13        &self,
14        file_path: String,
15        worker_id: &str,
16        reason: Option<String>,
17        task_id: Option<String>,
18    ) -> Result<Option<String>> {
19        let now = now_ms();
20
21        self.with_conn_mut(|conn| {
22            let tx = conn.transaction()?;
23            // Check if already locked
24            let existing: Option<String> = tx
25                .query_row(
26                    "SELECT worker_id FROM file_locks WHERE file_path = ?1",
27                    params![&file_path],
28                    |row| row.get(0),
29                )
30                .ok();
31
32            let result = if let Some(existing_worker) = existing {
33                if existing_worker != worker_id {
34                    // Locked by another worker - return warning
35                    Some(existing_worker)
36                } else {
37                    // Already locked by this worker - just update timestamp, reason, and task_id
38                    tx.execute(
39                        "UPDATE file_locks SET locked_at = ?1, reason = ?2, task_id = ?3 WHERE file_path = ?4",
40                        params![now, &reason, &task_id, &file_path],
41                    )?;
42                    None
43                }
44            } else {
45                // Not locked - create new lock
46                tx.execute(
47                    "INSERT INTO file_locks (file_path, worker_id, reason, locked_at, task_id) VALUES (?1, ?2, ?3, ?4, ?5)",
48                    params![&file_path, worker_id, &reason, now, &task_id],
49                )?;
50
51                // Record claim event for tracking
52                tx.execute(
53                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp) VALUES (?1, ?2, 'claimed', ?3, ?4)",
54                    params![&file_path, worker_id, &reason, now],
55                )?;
56                None
57            };
58
59            tx.commit()?;
60            Ok(result)
61        })
62    }
63
64    /// Unlock a file with optional reason for next claimant.
65    pub fn unlock_file(
66        &self,
67        file_path: &str,
68        worker_id: &str,
69        reason: Option<String>,
70    ) -> Result<bool> {
71        let now = now_ms();
72
73        self.with_conn_mut(|conn| {
74            let tx = conn.transaction()?;
75
76            let deleted = tx.execute(
77                "DELETE FROM file_locks WHERE file_path = ?1 AND worker_id = ?2",
78                params![file_path, worker_id],
79            )?;
80
81            if deleted > 0 {
82                // Find the claim_id for this file+worker (most recent claim)
83                let claim_id: Option<i64> = tx.query_row(
84                    "SELECT MAX(id) FROM claim_sequence
85                     WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
86                    params![file_path, worker_id],
87                    |row| row.get(0),
88                ).ok().flatten();
89
90                // Close any open claim for this file+worker
91                tx.execute(
92                    "UPDATE claim_sequence SET end_timestamp = ?1
93                     WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
94                    params![now, file_path, worker_id],
95                )?;
96
97                // Record release event with claim_id reference
98                tx.execute(
99                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
100                     VALUES (?1, ?2, 'released', ?3, ?4, ?5)",
101                    params![file_path, worker_id, &reason, now, claim_id],
102                )?;
103            }
104
105            tx.commit()?;
106            Ok(deleted > 0)
107        })
108    }
109
110    /// Unlock multiple files with verbose return.
111    /// Returns a list of (file_path, worker_id) pairs for files that were actually released.
112    pub fn unlock_files_verbose(
113        &self,
114        file_paths: Vec<String>,
115        worker_id: &str,
116        reason: Option<String>,
117    ) -> Result<Vec<(String, String)>> {
118        let now = now_ms();
119        let mut released = Vec::new();
120
121        self.with_conn_mut(|conn| {
122            let tx = conn.transaction()?;
123
124            for file_path in file_paths {
125                let deleted = tx.execute(
126                    "DELETE FROM file_locks WHERE file_path = ?1 AND worker_id = ?2",
127                    params![&file_path, worker_id],
128                )?;
129
130                if deleted > 0 {
131                    // Find the claim_id for this file+worker (most recent claim)
132                    let claim_id: Option<i64> = tx.query_row(
133                        "SELECT MAX(id) FROM claim_sequence
134                         WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
135                        params![&file_path, worker_id],
136                        |row| row.get(0),
137                    ).ok().flatten();
138
139                    // Close any open claim for this file+worker
140                    tx.execute(
141                        "UPDATE claim_sequence SET end_timestamp = ?1
142                         WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
143                        params![now, &file_path, worker_id],
144                    )?;
145
146                    // Record release event with claim_id reference
147                    tx.execute(
148                        "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
149                         VALUES (?1, ?2, 'released', ?3, ?4, ?5)",
150                        params![&file_path, worker_id, &reason, now, claim_id],
151                    )?;
152
153                    released.push((file_path, worker_id.to_string()));
154                }
155            }
156
157            tx.commit()?;
158            Ok(released)
159        })
160    }
161
162    /// Release all files held by a worker with verbose return.
163    /// Returns a list of (file_path, worker_id) pairs for files that were released.
164    pub fn release_worker_locks_verbose(
165        &self,
166        worker_id: &str,
167        reason: Option<String>,
168    ) -> Result<Vec<(String, String)>> {
169        let now = now_ms();
170
171        self.with_conn_mut(|conn| {
172            let tx = conn.transaction()?;
173
174            // Get files locked by this worker before deleting
175            let files_to_release: Vec<String> = {
176                let mut stmt =
177                    tx.prepare("SELECT file_path FROM file_locks WHERE worker_id = ?1")?;
178                stmt.query_map(params![worker_id], |row| row.get::<_, String>(0))?
179                    .filter_map(|r| r.ok())
180                    .collect()
181            };
182
183            if files_to_release.is_empty() {
184                tx.commit()?;
185                return Ok(Vec::new());
186            }
187
188            // Close any open claims for this worker
189            tx.execute(
190                "UPDATE claim_sequence SET end_timestamp = ?1
191                 WHERE worker_id = ?2 AND end_timestamp IS NULL",
192                params![now, worker_id],
193            )?;
194
195            // Record release events for each file
196            for file_path in &files_to_release {
197                tx.execute(
198                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
199                     VALUES (?1, ?2, 'released', ?3, ?4)",
200                    params![file_path, worker_id, &reason, now],
201                )?;
202            }
203
204            // Delete the locks
205            tx.execute(
206                "DELETE FROM file_locks WHERE worker_id = ?1",
207                params![worker_id],
208            )?;
209
210            tx.commit()?;
211
212            let released: Vec<(String, String)> = files_to_release
213                .into_iter()
214                .map(|f| (f, worker_id.to_string()))
215                .collect();
216
217            Ok(released)
218        })
219    }
220
221    /// Release all files associated with a task with verbose return.
222    /// Returns a list of (file_path, worker_id) pairs for files that were released.
223    pub fn release_task_locks_verbose(
224        &self,
225        task_id: &str,
226        reason: Option<String>,
227    ) -> Result<Vec<(String, String)>> {
228        let now = now_ms();
229
230        self.with_conn_mut(|conn| {
231            let tx = conn.transaction()?;
232
233            // Get files locked by this task before deleting
234            let files_to_release: Vec<(String, String)> = {
235                let mut stmt =
236                    tx.prepare("SELECT file_path, worker_id FROM file_locks WHERE task_id = ?1")?;
237                stmt.query_map(params![task_id], |row| {
238                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
239                })?
240                .filter_map(|r| r.ok())
241                .collect()
242            };
243
244            if files_to_release.is_empty() {
245                tx.commit()?;
246                return Ok(Vec::new());
247            }
248
249            // Close any open claims for these files
250            for (file_path, worker_id) in &files_to_release {
251                tx.execute(
252                    "UPDATE claim_sequence SET end_timestamp = ?1
253                     WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
254                    params![now, file_path, worker_id],
255                )?;
256
257                // Record release event
258                let reason_str = reason.as_deref().unwrap_or("task release");
259                tx.execute(
260                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
261                     VALUES (?1, ?2, 'released', ?3, ?4)",
262                    params![file_path, worker_id, reason_str, now],
263                )?;
264            }
265
266            // Delete the locks
267            tx.execute(
268                "DELETE FROM file_locks WHERE task_id = ?1",
269                params![task_id],
270            )?;
271
272            tx.commit()?;
273            Ok(files_to_release)
274        })
275    }
276
277    /// Get claim updates since worker's last poll.
278    /// Returns all claim/release events since the agent's last poll position.
279    pub fn claim_updates(&self, worker_id: &str) -> Result<ClaimUpdates> {
280        self.with_conn(|conn| {
281            // Get worker's last sequence
282            let last_seq: i64 = conn
283                .query_row(
284                    "SELECT last_claim_sequence FROM workers WHERE id = ?1",
285                    params![worker_id],
286                    |row| row.get(0),
287                )
288                .unwrap_or(0);
289
290            // Get all new events since last sequence.
291            // We use >= because last_seq now represents "next event to fetch" (set to max+1 after each poll).
292            let mut stmt = conn.prepare(
293                "SELECT id, file_path, worker_id, event, reason, timestamp, end_timestamp, claim_id
294                 FROM claim_sequence
295                 WHERE id >= ?1
296                 ORDER BY id",
297            )?;
298            let events: Vec<ClaimEvent> = stmt
299                .query_map(params![last_seq], |row| {
300                    Ok(ClaimEvent {
301                        id: row.get(0)?,
302                        file_path: row.get(1)?,
303                        worker_id: row.get(2)?,
304                        event: ClaimEventType::parse(&row.get::<_, String>(3)?)
305                            .unwrap_or(ClaimEventType::Claimed),
306                        reason: row.get(4)?,
307                        timestamp: row.get(5)?,
308                        end_timestamp: row.get(6)?,
309                        claim_id: row.get(7)?,
310                    })
311                })?
312                .filter_map(|r| r.ok())
313                .collect();
314
315            // Find max sequence from events. After polling, we set last_seq = max + 1
316            // so that claims we just saw have claim_id < last_seq (for release filtering).
317            let max_seen = events.iter().map(|e| e.id).max();
318            let new_seq = match max_seen {
319                Some(max) => max + 1, // +1 so claims just polled satisfy claim_id < new_seq
320                None => last_seq,     // No events, keep current sequence
321            };
322
323            // Update worker's last sequence
324            if new_seq > last_seq {
325                conn.execute(
326                    "UPDATE workers SET last_claim_sequence = ?1 WHERE id = ?2",
327                    params![new_seq, worker_id],
328                )?;
329            }
330
331            // Separate into claims and releases
332            let new_claims: Vec<ClaimEvent> = events
333                .iter()
334                .filter(|e| e.event == ClaimEventType::Claimed)
335                .cloned()
336                .collect();
337
338            // For releases, only include if agent has polled and received the original claim.
339            // Include if claim_id < last_seq (strictly less - was in a previous poll, and
340            // last_seq is max+1 after each poll) OR claim_id is in current batch.
341            let new_claim_ids: HashSet<i64> = new_claims.iter().map(|c| c.id).collect();
342
343            let dropped_claims: Vec<ClaimEvent> = events
344                .iter()
345                .filter(|e| e.event == ClaimEventType::Released)
346                .filter(|release| {
347                    match release.claim_id {
348                        Some(cid) => cid < last_seq || new_claim_ids.contains(&cid),
349                        None => true, // Legacy releases without claim_id - include them
350                    }
351                })
352                .cloned()
353                .collect();
354
355            Ok(ClaimUpdates {
356                new_claims,
357                dropped_claims,
358                sequence: new_seq,
359            })
360        })
361    }
362
363    /// Get file locks with full details.
364    pub fn get_file_locks(
365        &self,
366        file_paths: Option<Vec<String>>,
367        agent_id: Option<&str>,
368        task_id: Option<&str>,
369    ) -> Result<HashMap<String, FileLock>> {
370        self.with_conn(|conn| {
371            let locks = if let Some(paths) = file_paths {
372                if paths.is_empty() {
373                    return Ok(HashMap::new());
374                }
375
376                let placeholders: Vec<String> = paths.iter().map(|_| "?".to_string()).collect();
377                let sql = format!(
378                    "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE file_path IN ({})",
379                    placeholders.join(", ")
380                );
381
382                let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
383                for path in &paths {
384                    params_vec.push(Box::new(path.clone()));
385                }
386
387                let params_refs: Vec<&dyn rusqlite::ToSql> =
388                    params_vec.iter().map(|b| b.as_ref()).collect();
389
390                let mut stmt = conn.prepare(&sql)?;
391                stmt.query_map(params_refs.as_slice(), |row| {
392                    let file_path: String = row.get(0)?;
393                    Ok((file_path.clone(), FileLock {
394                        file_path,
395                        worker_id: row.get(1)?,
396                        reason: row.get(2)?,
397                        locked_at: row.get(3)?,
398                        task_id: row.get(4)?,
399                    }))
400                })?
401                .filter_map(|r| r.ok())
402                .collect()
403            } else if let Some(aid) = agent_id {
404                let mut stmt = conn.prepare(
405                    "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE worker_id = ?1",
406                )?;
407                stmt.query_map(params![aid], |row| {
408                    let file_path: String = row.get(0)?;
409                    Ok((file_path.clone(), FileLock {
410                        file_path,
411                        worker_id: row.get(1)?,
412                        reason: row.get(2)?,
413                        locked_at: row.get(3)?,
414                        task_id: row.get(4)?,
415                    }))
416                })?
417                .filter_map(|r| r.ok())
418                .collect()
419            } else if let Some(tid) = task_id {
420                let mut stmt = conn.prepare(
421                    "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE task_id = ?1",
422                )?;
423                stmt.query_map(params![tid], |row| {
424                    let file_path: String = row.get(0)?;
425                    Ok((file_path.clone(), FileLock {
426                        file_path,
427                        worker_id: row.get(1)?,
428                        reason: row.get(2)?,
429                        locked_at: row.get(3)?,
430                        task_id: row.get(4)?,
431                    }))
432                })?
433                .filter_map(|r| r.ok())
434                .collect()
435            } else {
436                // Return empty - we now require at least one filter
437                HashMap::new()
438            };
439
440            Ok(locks)
441        })
442    }
443
444    /// Get all file locks as FileLock objects.
445    pub fn get_all_file_locks(&self) -> Result<Vec<FileLock>> {
446        self.with_conn(|conn| {
447            let mut stmt = conn.prepare(
448                "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks",
449            )?;
450
451            let locks = stmt
452                .query_map([], |row| {
453                    let file_path: String = row.get(0)?;
454                    let worker_id: String = row.get(1)?;
455                    let reason: Option<String> = row.get(2)?;
456                    let locked_at: i64 = row.get(3)?;
457                    let task_id: Option<String> = row.get(4)?;
458                    Ok(FileLock {
459                        file_path,
460                        worker_id,
461                        reason,
462                        locked_at,
463                        task_id,
464                    })
465                })?
466                .filter_map(|r| r.ok())
467                .collect();
468
469            Ok(locks)
470        })
471    }
472
473    /// Release all locks held by a worker.
474    pub fn release_worker_locks(&self, worker_id: &str) -> Result<i32> {
475        let now = now_ms();
476
477        self.with_conn(|conn| {
478            // Close any open claims for this worker
479            conn.execute(
480                "UPDATE claim_sequence SET end_timestamp = ?1
481                 WHERE worker_id = ?2 AND end_timestamp IS NULL",
482                params![now, worker_id],
483            )?;
484
485            let deleted = conn.execute(
486                "DELETE FROM file_locks WHERE worker_id = ?1",
487                params![worker_id],
488            )?;
489
490            Ok(deleted as i32)
491        })
492    }
493
494    /// Release all locks associated with a task.
495    /// Called automatically when a task completes.
496    pub fn release_task_locks(&self, task_id: &str) -> Result<i32> {
497        let now = now_ms();
498
499        self.with_conn(|conn| {
500            // Get files locked by this task before deleting
501            let files_to_release: Vec<(String, String)> = {
502                let mut stmt =
503                    conn.prepare("SELECT file_path, worker_id FROM file_locks WHERE task_id = ?1")?;
504                stmt.query_map(params![task_id], |row| {
505                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
506                })?
507                .filter_map(|r| r.ok())
508                .collect()
509            };
510
511            // Close any open claims for these files
512            for (file_path, worker_id) in &files_to_release {
513                conn.execute(
514                    "UPDATE claim_sequence SET end_timestamp = ?1
515                     WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
516                    params![now, file_path, worker_id],
517                )?;
518
519                // Record release event
520                conn.execute(
521                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
522                     VALUES (?1, ?2, 'released', 'task completed', ?3)",
523                    params![file_path, worker_id, now],
524                )?;
525            }
526
527            let deleted = conn.execute(
528                "DELETE FROM file_locks WHERE task_id = ?1",
529                params![task_id],
530            )?;
531
532            Ok(deleted as i32)
533        })
534    }
535}