Skip to main content

task_graph_mcp/db/
locks.rs

1//! File lock operations (advisory) and claim tracking.
2
3use super::{now_ms, Database};
4use crate::types::{ClaimEvent, ClaimEventType, ClaimUpdates, FileLock};
5use anyhow::Result;
6use rusqlite::params;
7use std::collections::{HashMap, HashSet};
8
9impl Database {
10    /// Lock a file (advisory).
11    /// Returns Ok with optional warning if already locked by another worker.
12    pub fn lock_file(&self, file_path: String, worker_id: &str, reason: Option<String>, task_id: Option<String>) -> Result<Option<String>> {
13        let now = now_ms();
14
15        self.with_conn_mut(|conn| {
16            let tx = conn.transaction()?;
17            // Check if already locked
18            let existing: Option<String> = tx
19                .query_row(
20                    "SELECT worker_id FROM file_locks WHERE file_path = ?1",
21                    params![&file_path],
22                    |row| row.get(0),
23                )
24                .ok();
25
26            let result = if let Some(existing_worker) = existing {
27                if existing_worker != worker_id {
28                    // Locked by another worker - return warning
29                    Some(existing_worker)
30                } else {
31                    // Already locked by this worker - just update timestamp, reason, and task_id
32                    tx.execute(
33                        "UPDATE file_locks SET locked_at = ?1, reason = ?2, task_id = ?3 WHERE file_path = ?4",
34                        params![now, &reason, &task_id, &file_path],
35                    )?;
36                    None
37                }
38            } else {
39                // Not locked - create new lock
40                tx.execute(
41                    "INSERT INTO file_locks (file_path, worker_id, reason, locked_at, task_id) VALUES (?1, ?2, ?3, ?4, ?5)",
42                    params![&file_path, worker_id, &reason, now, &task_id],
43                )?;
44
45                // Record claim event for tracking
46                tx.execute(
47                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp) VALUES (?1, ?2, 'claimed', ?3, ?4)",
48                    params![&file_path, worker_id, &reason, now],
49                )?;
50                None
51            };
52
53            tx.commit()?;
54            Ok(result)
55        })
56    }
57
58    /// Unlock a file with optional reason for next claimant.
59    pub fn unlock_file(&self, file_path: &str, worker_id: &str, reason: Option<String>) -> Result<bool> {
60        let now = now_ms();
61
62        self.with_conn_mut(|conn| {
63            let tx = conn.transaction()?;
64
65            let deleted = tx.execute(
66                "DELETE FROM file_locks WHERE file_path = ?1 AND worker_id = ?2",
67                params![file_path, worker_id],
68            )?;
69
70            if deleted > 0 {
71                // Find the claim_id for this file+worker (most recent claim)
72                let claim_id: Option<i64> = tx.query_row(
73                    "SELECT MAX(id) FROM claim_sequence
74                     WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
75                    params![file_path, worker_id],
76                    |row| row.get(0),
77                ).ok().flatten();
78
79                // Close any open claim for this file+worker
80                tx.execute(
81                    "UPDATE claim_sequence SET end_timestamp = ?1
82                     WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
83                    params![now, file_path, worker_id],
84                )?;
85
86                // Record release event with claim_id reference
87                tx.execute(
88                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
89                     VALUES (?1, ?2, 'released', ?3, ?4, ?5)",
90                    params![file_path, worker_id, &reason, now, claim_id],
91                )?;
92            }
93
94            tx.commit()?;
95            Ok(deleted > 0)
96        })
97    }
98
99    /// Unlock multiple files with verbose return.
100    /// Returns a list of (file_path, worker_id) pairs for files that were actually released.
101    pub fn unlock_files_verbose(
102        &self,
103        file_paths: Vec<String>,
104        worker_id: &str,
105        reason: Option<String>,
106    ) -> Result<Vec<(String, String)>> {
107        let now = now_ms();
108        let mut released = Vec::new();
109
110        self.with_conn_mut(|conn| {
111            let tx = conn.transaction()?;
112
113            for file_path in file_paths {
114                let deleted = tx.execute(
115                    "DELETE FROM file_locks WHERE file_path = ?1 AND worker_id = ?2",
116                    params![&file_path, worker_id],
117                )?;
118
119                if deleted > 0 {
120                    // Find the claim_id for this file+worker (most recent claim)
121                    let claim_id: Option<i64> = tx.query_row(
122                        "SELECT MAX(id) FROM claim_sequence
123                         WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
124                        params![&file_path, worker_id],
125                        |row| row.get(0),
126                    ).ok().flatten();
127
128                    // Close any open claim for this file+worker
129                    tx.execute(
130                        "UPDATE claim_sequence SET end_timestamp = ?1
131                         WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
132                        params![now, &file_path, worker_id],
133                    )?;
134
135                    // Record release event with claim_id reference
136                    tx.execute(
137                        "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
138                         VALUES (?1, ?2, 'released', ?3, ?4, ?5)",
139                        params![&file_path, worker_id, &reason, now, claim_id],
140                    )?;
141
142                    released.push((file_path, worker_id.to_string()));
143                }
144            }
145
146            tx.commit()?;
147            Ok(released)
148        })
149    }
150
151    /// Release all files held by a worker with verbose return.
152    /// Returns a list of (file_path, worker_id) pairs for files that were released.
153    pub fn release_worker_locks_verbose(&self, worker_id: &str, reason: Option<String>) -> Result<Vec<(String, String)>> {
154        let now = now_ms();
155
156        self.with_conn_mut(|conn| {
157            let tx = conn.transaction()?;
158
159            // Get files locked by this worker before deleting
160            let files_to_release: Vec<String> = {
161                let mut stmt = tx.prepare(
162                    "SELECT file_path FROM file_locks WHERE worker_id = ?1"
163                )?;
164                stmt.query_map(params![worker_id], |row| row.get::<_, String>(0))?
165                    .filter_map(|r| r.ok())
166                    .collect()
167            };
168
169            if files_to_release.is_empty() {
170                tx.commit()?;
171                return Ok(Vec::new());
172            }
173
174            // Close any open claims for this worker
175            tx.execute(
176                "UPDATE claim_sequence SET end_timestamp = ?1
177                 WHERE worker_id = ?2 AND end_timestamp IS NULL",
178                params![now, worker_id],
179            )?;
180
181            // Record release events for each file
182            for file_path in &files_to_release {
183                tx.execute(
184                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
185                     VALUES (?1, ?2, 'released', ?3, ?4)",
186                    params![file_path, worker_id, &reason, now],
187                )?;
188            }
189
190            // Delete the locks
191            tx.execute(
192                "DELETE FROM file_locks WHERE worker_id = ?1",
193                params![worker_id],
194            )?;
195
196            tx.commit()?;
197
198            let released: Vec<(String, String)> = files_to_release
199                .into_iter()
200                .map(|f| (f, worker_id.to_string()))
201                .collect();
202
203            Ok(released)
204        })
205    }
206
207    /// Release all files associated with a task with verbose return.
208    /// Returns a list of (file_path, worker_id) pairs for files that were released.
209    pub fn release_task_locks_verbose(&self, task_id: &str, reason: Option<String>) -> Result<Vec<(String, String)>> {
210        let now = now_ms();
211
212        self.with_conn_mut(|conn| {
213            let tx = conn.transaction()?;
214
215            // Get files locked by this task before deleting
216            let files_to_release: Vec<(String, String)> = {
217                let mut stmt = tx.prepare(
218                    "SELECT file_path, worker_id FROM file_locks WHERE task_id = ?1"
219                )?;
220                stmt.query_map(params![task_id], |row| {
221                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
222                })?
223                .filter_map(|r| r.ok())
224                .collect()
225            };
226
227            if files_to_release.is_empty() {
228                tx.commit()?;
229                return Ok(Vec::new());
230            }
231
232            // Close any open claims for these files
233            for (file_path, worker_id) in &files_to_release {
234                tx.execute(
235                    "UPDATE claim_sequence SET end_timestamp = ?1
236                     WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
237                    params![now, file_path, worker_id],
238                )?;
239
240                // Record release event
241                let reason_str = reason.as_deref().unwrap_or("task release");
242                tx.execute(
243                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
244                     VALUES (?1, ?2, 'released', ?3, ?4)",
245                    params![file_path, worker_id, reason_str, now],
246                )?;
247            }
248
249            // Delete the locks
250            tx.execute(
251                "DELETE FROM file_locks WHERE task_id = ?1",
252                params![task_id],
253            )?;
254
255            tx.commit()?;
256            Ok(files_to_release)
257        })
258    }
259
260    /// Get claim updates since worker's last poll.
261    /// Returns all claim/release events since the agent's last poll position.
262    pub fn claim_updates(&self, worker_id: &str) -> Result<ClaimUpdates> {
263        self.with_conn(|conn| {
264            // Get worker's last sequence
265            let last_seq: i64 = conn
266                .query_row(
267                    "SELECT last_claim_sequence FROM workers WHERE id = ?1",
268                    params![worker_id],
269                    |row| row.get(0),
270                )
271                .unwrap_or(0);
272
273            // Get all new events since last sequence.
274            // We use >= because last_seq now represents "next event to fetch" (set to max+1 after each poll).
275            let mut stmt = conn.prepare(
276                "SELECT id, file_path, worker_id, event, reason, timestamp, end_timestamp, claim_id
277                 FROM claim_sequence
278                 WHERE id >= ?1
279                 ORDER BY id"
280            )?;
281            let events: Vec<ClaimEvent> = stmt.query_map(params![last_seq], |row| {
282                Ok(ClaimEvent {
283                    id: row.get(0)?,
284                    file_path: row.get(1)?,
285                    worker_id: row.get(2)?,
286                    event: ClaimEventType::from_str(&row.get::<_, String>(3)?).unwrap_or(ClaimEventType::Claimed),
287                    reason: row.get(4)?,
288                    timestamp: row.get(5)?,
289                    end_timestamp: row.get(6)?,
290                    claim_id: row.get(7)?,
291                })
292            })?
293            .filter_map(|r| r.ok())
294            .collect();
295
296            // Find max sequence from events. After polling, we set last_seq = max + 1
297            // so that claims we just saw have claim_id < last_seq (for release filtering).
298            let max_seen = events.iter().map(|e| e.id).max();
299            let new_seq = match max_seen {
300                Some(max) => max + 1,  // +1 so claims just polled satisfy claim_id < new_seq
301                None => last_seq,       // No events, keep current sequence
302            };
303
304            // Update worker's last sequence
305            if new_seq > last_seq {
306                conn.execute(
307                    "UPDATE workers SET last_claim_sequence = ?1 WHERE id = ?2",
308                    params![new_seq, worker_id],
309                )?;
310            }
311
312            // Separate into claims and releases
313            let new_claims: Vec<ClaimEvent> = events.iter()
314                .filter(|e| e.event == ClaimEventType::Claimed)
315                .cloned()
316                .collect();
317
318            // For releases, only include if agent has polled and received the original claim.
319            // Include if claim_id < last_seq (strictly less - was in a previous poll, and
320            // last_seq is max+1 after each poll) OR claim_id is in current batch.
321            let new_claim_ids: HashSet<i64> = new_claims.iter()
322                .map(|c| c.id)
323                .collect();
324
325            let dropped_claims: Vec<ClaimEvent> = events.iter()
326                .filter(|e| e.event == ClaimEventType::Released)
327                .filter(|release| {
328                    match release.claim_id {
329                        Some(cid) => cid < last_seq || new_claim_ids.contains(&cid),
330                        None => true, // Legacy releases without claim_id - include them
331                    }
332                })
333                .cloned()
334                .collect();
335
336            Ok(ClaimUpdates {
337                new_claims,
338                dropped_claims,
339                sequence: new_seq,
340            })
341        })
342    }
343
344    /// Get file locks with full details.
345    pub fn get_file_locks(
346        &self,
347        file_paths: Option<Vec<String>>,
348        agent_id: Option<&str>,
349        task_id: Option<&str>,
350    ) -> Result<HashMap<String, FileLock>> {
351        self.with_conn(|conn| {
352            let locks = if let Some(paths) = file_paths {
353                if paths.is_empty() {
354                    return Ok(HashMap::new());
355                }
356
357                let placeholders: Vec<String> = paths.iter().map(|_| "?".to_string()).collect();
358                let sql = format!(
359                    "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE file_path IN ({})",
360                    placeholders.join(", ")
361                );
362
363                let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
364                for path in &paths {
365                    params_vec.push(Box::new(path.clone()));
366                }
367
368                let params_refs: Vec<&dyn rusqlite::ToSql> =
369                    params_vec.iter().map(|b| b.as_ref()).collect();
370
371                let mut stmt = conn.prepare(&sql)?;
372                stmt.query_map(params_refs.as_slice(), |row| {
373                    let file_path: String = row.get(0)?;
374                    Ok((file_path.clone(), FileLock {
375                        file_path,
376                        worker_id: row.get(1)?,
377                        reason: row.get(2)?,
378                        locked_at: row.get(3)?,
379                        task_id: row.get(4)?,
380                    }))
381                })?
382                .filter_map(|r| r.ok())
383                .collect()
384            } else if let Some(aid) = agent_id {
385                let mut stmt = conn.prepare(
386                    "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE worker_id = ?1",
387                )?;
388                stmt.query_map(params![aid], |row| {
389                    let file_path: String = row.get(0)?;
390                    Ok((file_path.clone(), FileLock {
391                        file_path,
392                        worker_id: row.get(1)?,
393                        reason: row.get(2)?,
394                        locked_at: row.get(3)?,
395                        task_id: row.get(4)?,
396                    }))
397                })?
398                .filter_map(|r| r.ok())
399                .collect()
400            } else if let Some(tid) = task_id {
401                let mut stmt = conn.prepare(
402                    "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE task_id = ?1",
403                )?;
404                stmt.query_map(params![tid], |row| {
405                    let file_path: String = row.get(0)?;
406                    Ok((file_path.clone(), FileLock {
407                        file_path,
408                        worker_id: row.get(1)?,
409                        reason: row.get(2)?,
410                        locked_at: row.get(3)?,
411                        task_id: row.get(4)?,
412                    }))
413                })?
414                .filter_map(|r| r.ok())
415                .collect()
416            } else {
417                // Return empty - we now require at least one filter
418                HashMap::new()
419            };
420
421            Ok(locks)
422        })
423    }
424
425    /// Get all file locks as FileLock objects.
426    pub fn get_all_file_locks(&self) -> Result<Vec<FileLock>> {
427        self.with_conn(|conn| {
428            let mut stmt =
429                conn.prepare("SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks")?;
430
431            let locks = stmt
432                .query_map([], |row| {
433                    let file_path: String = row.get(0)?;
434                    let worker_id: String = row.get(1)?;
435                    let reason: Option<String> = row.get(2)?;
436                    let locked_at: i64 = row.get(3)?;
437                    let task_id: Option<String> = row.get(4)?;
438                    Ok(FileLock {
439                        file_path,
440                        worker_id,
441                        reason,
442                        locked_at,
443                        task_id,
444                    })
445                })?
446                .filter_map(|r| r.ok())
447                .collect();
448
449            Ok(locks)
450        })
451    }
452
453    /// Release all locks held by a worker.
454    pub fn release_worker_locks(&self, worker_id: &str) -> Result<i32> {
455        let now = now_ms();
456
457        self.with_conn(|conn| {
458            // Close any open claims for this worker
459            conn.execute(
460                "UPDATE claim_sequence SET end_timestamp = ?1
461                 WHERE worker_id = ?2 AND end_timestamp IS NULL",
462                params![now, worker_id],
463            )?;
464
465            let deleted = conn.execute(
466                "DELETE FROM file_locks WHERE worker_id = ?1",
467                params![worker_id],
468            )?;
469
470            Ok(deleted as i32)
471        })
472    }
473
474
475    /// Release all locks associated with a task.
476    /// Called automatically when a task completes.
477    pub fn release_task_locks(&self, task_id: &str) -> Result<i32> {
478        let now = now_ms();
479
480        self.with_conn(|conn| {
481            // Get files locked by this task before deleting
482            let files_to_release: Vec<(String, String)> = {
483                let mut stmt = conn.prepare(
484                    "SELECT file_path, worker_id FROM file_locks WHERE task_id = ?1"
485                )?;
486                stmt.query_map(params![task_id], |row| {
487                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
488                })?
489                .filter_map(|r| r.ok())
490                .collect()
491            };
492
493            // Close any open claims for these files
494            for (file_path, worker_id) in &files_to_release {
495                conn.execute(
496                    "UPDATE claim_sequence SET end_timestamp = ?1
497                     WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
498                    params![now, file_path, worker_id],
499                )?;
500
501                // Record release event
502                conn.execute(
503                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
504                     VALUES (?1, ?2, 'released', 'task completed', ?3)",
505                    params![file_path, worker_id, now],
506                )?;
507            }
508
509            let deleted = conn.execute(
510                "DELETE FROM file_locks WHERE task_id = ?1",
511                params![task_id],
512            )?;
513
514            Ok(deleted as i32)
515        })
516    }
517}