task-graph-mcp 0.2.2

MCP server for agent task workflows with phases, prompts, gates, and multi-agent coordination
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
//! File lock operations (advisory and exclusive) and claim tracking.
//!
//! Two lock modes are supported:
//! - **Advisory marks** (default): `mark_file("src/main.rs")` - warns if another agent
//!   holds the mark, but allows it.
//! - **Exclusive locks** (`lock:` prefix): `mark_file("lock:git-commit")` - rejects with
//!   an error if another agent holds the lock. Used for mutual exclusion on shared resources.

use super::{Database, now_ms};
use crate::types::{ClaimEvent, ClaimEventType, ClaimUpdates, FileLock};
use anyhow::Result;
use rusqlite::params;
use std::collections::{HashMap, HashSet};

/// Result of an exclusive lock attempt.
pub enum ExclusiveLockResult {
    /// Lock acquired successfully.
    Acquired,
    /// Lock already held by this agent (refreshed).
    AlreadyHeldBySelf,
    /// Lock held by another agent - cannot acquire.
    HeldByOther(String),
}

impl Database {
    /// Acquire an exclusive lock on a resource.
    ///
    /// Unlike advisory `lock_file`, this method enforces mutual exclusion:
    /// - If the lock is free, it is acquired.
    /// - If the lock is already held by the same agent, it is refreshed (timestamp/reason updated).
    /// - If the lock is held by another agent, returns `HeldByOther(agent_id)`.
    ///
    /// The `file_path` parameter stores the full `lock:resource` string for consistent
    /// storage in the file_locks table.
    pub fn lock_file_exclusive(
        &self,
        file_path: String,
        worker_id: &str,
        reason: Option<String>,
        task_id: Option<String>,
    ) -> Result<ExclusiveLockResult> {
        let now = now_ms();

        self.with_conn_mut(|conn| {
            let tx = conn.transaction()?;

            // Check if already locked
            let existing: Option<String> = tx
                .query_row(
                    "SELECT worker_id FROM file_locks WHERE file_path = ?1",
                    params![&file_path],
                    |row| row.get(0),
                )
                .ok();

            let result = if let Some(existing_worker) = existing {
                if existing_worker != worker_id {
                    // Locked by another worker - exclusive rejection
                    ExclusiveLockResult::HeldByOther(existing_worker)
                } else {
                    // Already locked by this worker - refresh timestamp, reason, and task_id
                    tx.execute(
                        "UPDATE file_locks SET locked_at = ?1, reason = ?2, task_id = ?3 WHERE file_path = ?4",
                        params![now, &reason, &task_id, &file_path],
                    )?;
                    ExclusiveLockResult::AlreadyHeldBySelf
                }
            } else {
                // Not locked - create new lock
                tx.execute(
                    "INSERT INTO file_locks (file_path, worker_id, reason, locked_at, task_id) VALUES (?1, ?2, ?3, ?4, ?5)",
                    params![&file_path, worker_id, &reason, now, &task_id],
                )?;

                // Record claim event for tracking
                tx.execute(
                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp) VALUES (?1, ?2, 'claimed', ?3, ?4)",
                    params![&file_path, worker_id, &reason, now],
                )?;
                ExclusiveLockResult::Acquired
            };

            tx.commit()?;
            Ok(result)
        })
    }

    /// Lock a file (advisory).
    /// Returns Ok with optional warning if already locked by another worker.
    pub fn lock_file(
        &self,
        file_path: String,
        worker_id: &str,
        reason: Option<String>,
        task_id: Option<String>,
    ) -> Result<Option<String>> {
        let now = now_ms();

        self.with_conn_mut(|conn| {
            let tx = conn.transaction()?;
            // Check if already locked
            let existing: Option<String> = tx
                .query_row(
                    "SELECT worker_id FROM file_locks WHERE file_path = ?1",
                    params![&file_path],
                    |row| row.get(0),
                )
                .ok();

            let result = if let Some(existing_worker) = existing {
                if existing_worker != worker_id {
                    // Locked by another worker - return warning
                    Some(existing_worker)
                } else {
                    // Already locked by this worker - just update timestamp, reason, and task_id
                    tx.execute(
                        "UPDATE file_locks SET locked_at = ?1, reason = ?2, task_id = ?3 WHERE file_path = ?4",
                        params![now, &reason, &task_id, &file_path],
                    )?;
                    None
                }
            } else {
                // Not locked - create new lock
                tx.execute(
                    "INSERT INTO file_locks (file_path, worker_id, reason, locked_at, task_id) VALUES (?1, ?2, ?3, ?4, ?5)",
                    params![&file_path, worker_id, &reason, now, &task_id],
                )?;

                // Record claim event for tracking
                tx.execute(
                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp) VALUES (?1, ?2, 'claimed', ?3, ?4)",
                    params![&file_path, worker_id, &reason, now],
                )?;
                None
            };

            tx.commit()?;
            Ok(result)
        })
    }

    /// Unlock a file with optional reason for next claimant.
    pub fn unlock_file(
        &self,
        file_path: &str,
        worker_id: &str,
        reason: Option<String>,
    ) -> Result<bool> {
        let now = now_ms();

        self.with_conn_mut(|conn| {
            let tx = conn.transaction()?;

            let deleted = tx.execute(
                "DELETE FROM file_locks WHERE file_path = ?1 AND worker_id = ?2",
                params![file_path, worker_id],
            )?;

            if deleted > 0 {
                // Find the claim_id for this file+worker (most recent claim)
                let claim_id: Option<i64> = tx.query_row(
                    "SELECT MAX(id) FROM claim_sequence
                     WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
                    params![file_path, worker_id],
                    |row| row.get(0),
                ).ok().flatten();

                // Close any open claim for this file+worker
                tx.execute(
                    "UPDATE claim_sequence SET end_timestamp = ?1
                     WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
                    params![now, file_path, worker_id],
                )?;

                // Record release event with claim_id reference
                tx.execute(
                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
                     VALUES (?1, ?2, 'released', ?3, ?4, ?5)",
                    params![file_path, worker_id, &reason, now, claim_id],
                )?;
            }

            tx.commit()?;
            Ok(deleted > 0)
        })
    }

    /// Unlock multiple files with verbose return.
    /// Returns a list of (file_path, worker_id) pairs for files that were actually released.
    pub fn unlock_files_verbose(
        &self,
        file_paths: Vec<String>,
        worker_id: &str,
        reason: Option<String>,
    ) -> Result<Vec<(String, String)>> {
        let now = now_ms();
        let mut released = Vec::new();

        self.with_conn_mut(|conn| {
            let tx = conn.transaction()?;

            for file_path in file_paths {
                let deleted = tx.execute(
                    "DELETE FROM file_locks WHERE file_path = ?1 AND worker_id = ?2",
                    params![&file_path, worker_id],
                )?;

                if deleted > 0 {
                    // Find the claim_id for this file+worker (most recent claim)
                    let claim_id: Option<i64> = tx.query_row(
                        "SELECT MAX(id) FROM claim_sequence
                         WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
                        params![&file_path, worker_id],
                        |row| row.get(0),
                    ).ok().flatten();

                    // Close any open claim for this file+worker
                    tx.execute(
                        "UPDATE claim_sequence SET end_timestamp = ?1
                         WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
                        params![now, &file_path, worker_id],
                    )?;

                    // Record release event with claim_id reference
                    tx.execute(
                        "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
                         VALUES (?1, ?2, 'released', ?3, ?4, ?5)",
                        params![&file_path, worker_id, &reason, now, claim_id],
                    )?;

                    released.push((file_path, worker_id.to_string()));
                }
            }

            tx.commit()?;
            Ok(released)
        })
    }

    /// Release all files held by a worker with verbose return.
    /// Returns a list of (file_path, worker_id) pairs for files that were released.
    pub fn release_worker_locks_verbose(
        &self,
        worker_id: &str,
        reason: Option<String>,
    ) -> Result<Vec<(String, String)>> {
        let now = now_ms();

        self.with_conn_mut(|conn| {
            let tx = conn.transaction()?;

            // Get files locked by this worker before deleting
            let files_to_release: Vec<String> = {
                let mut stmt =
                    tx.prepare("SELECT file_path FROM file_locks WHERE worker_id = ?1")?;
                stmt.query_map(params![worker_id], |row| row.get::<_, String>(0))?
                    .filter_map(|r| r.ok())
                    .collect()
            };

            if files_to_release.is_empty() {
                tx.commit()?;
                return Ok(Vec::new());
            }

            // Close any open claims for this worker
            tx.execute(
                "UPDATE claim_sequence SET end_timestamp = ?1
                 WHERE worker_id = ?2 AND end_timestamp IS NULL",
                params![now, worker_id],
            )?;

            // Record release events for each file
            for file_path in &files_to_release {
                tx.execute(
                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
                     VALUES (?1, ?2, 'released', ?3, ?4)",
                    params![file_path, worker_id, &reason, now],
                )?;
            }

            // Delete the locks
            tx.execute(
                "DELETE FROM file_locks WHERE worker_id = ?1",
                params![worker_id],
            )?;

            tx.commit()?;

            let released: Vec<(String, String)> = files_to_release
                .into_iter()
                .map(|f| (f, worker_id.to_string()))
                .collect();

            Ok(released)
        })
    }

    /// Release all files associated with a task with verbose return.
    /// Returns a list of (file_path, worker_id) pairs for files that were released.
    pub fn release_task_locks_verbose(
        &self,
        task_id: &str,
        reason: Option<String>,
    ) -> Result<Vec<(String, String)>> {
        let now = now_ms();

        self.with_conn_mut(|conn| {
            let tx = conn.transaction()?;

            // Get files locked by this task before deleting
            let files_to_release: Vec<(String, String)> = {
                let mut stmt =
                    tx.prepare("SELECT file_path, worker_id FROM file_locks WHERE task_id = ?1")?;
                stmt.query_map(params![task_id], |row| {
                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
                })?
                .filter_map(|r| r.ok())
                .collect()
            };

            if files_to_release.is_empty() {
                tx.commit()?;
                return Ok(Vec::new());
            }

            // Close any open claims for these files
            for (file_path, worker_id) in &files_to_release {
                tx.execute(
                    "UPDATE claim_sequence SET end_timestamp = ?1
                     WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
                    params![now, file_path, worker_id],
                )?;

                // Record release event
                let reason_str = reason.as_deref().unwrap_or("task release");
                tx.execute(
                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
                     VALUES (?1, ?2, 'released', ?3, ?4)",
                    params![file_path, worker_id, reason_str, now],
                )?;
            }

            // Delete the locks
            tx.execute(
                "DELETE FROM file_locks WHERE task_id = ?1",
                params![task_id],
            )?;

            tx.commit()?;
            Ok(files_to_release)
        })
    }

    /// Get claim updates since worker's last poll.
    /// Returns all claim/release events since the agent's last poll position.
    pub fn claim_updates(&self, worker_id: &str) -> Result<ClaimUpdates> {
        self.with_conn(|conn| {
            // Get worker's last sequence
            let last_seq: i64 = conn
                .query_row(
                    "SELECT last_claim_sequence FROM workers WHERE id = ?1",
                    params![worker_id],
                    |row| row.get(0),
                )
                .unwrap_or(0);

            // Get all new events since last sequence.
            // We use >= because last_seq now represents "next event to fetch" (set to max+1 after each poll).
            let mut stmt = conn.prepare(
                "SELECT id, file_path, worker_id, event, reason, timestamp, end_timestamp, claim_id
                 FROM claim_sequence
                 WHERE id >= ?1
                 ORDER BY id",
            )?;
            let events: Vec<ClaimEvent> = stmt
                .query_map(params![last_seq], |row| {
                    Ok(ClaimEvent {
                        id: row.get(0)?,
                        file_path: row.get(1)?,
                        worker_id: row.get(2)?,
                        event: ClaimEventType::parse(&row.get::<_, String>(3)?)
                            .unwrap_or(ClaimEventType::Claimed),
                        reason: row.get(4)?,
                        timestamp: row.get(5)?,
                        end_timestamp: row.get(6)?,
                        claim_id: row.get(7)?,
                    })
                })?
                .filter_map(|r| r.ok())
                .collect();

            // Find max sequence from events. After polling, we set last_seq = max + 1
            // so that claims we just saw have claim_id < last_seq (for release filtering).
            let max_seen = events.iter().map(|e| e.id).max();
            let new_seq = match max_seen {
                Some(max) => max + 1, // +1 so claims just polled satisfy claim_id < new_seq
                None => last_seq,     // No events, keep current sequence
            };

            // Update worker's last sequence
            if new_seq > last_seq {
                conn.execute(
                    "UPDATE workers SET last_claim_sequence = ?1 WHERE id = ?2",
                    params![new_seq, worker_id],
                )?;
            }

            // Separate into claims and releases
            let new_claims: Vec<ClaimEvent> = events
                .iter()
                .filter(|e| e.event == ClaimEventType::Claimed)
                .cloned()
                .collect();

            // For releases, only include if agent has polled and received the original claim.
            // Include if claim_id < last_seq (strictly less - was in a previous poll, and
            // last_seq is max+1 after each poll) OR claim_id is in current batch.
            let new_claim_ids: HashSet<i64> = new_claims.iter().map(|c| c.id).collect();

            let dropped_claims: Vec<ClaimEvent> = events
                .iter()
                .filter(|e| e.event == ClaimEventType::Released)
                .filter(|release| {
                    match release.claim_id {
                        Some(cid) => cid < last_seq || new_claim_ids.contains(&cid),
                        None => true, // Legacy releases without claim_id - include them
                    }
                })
                .cloned()
                .collect();

            Ok(ClaimUpdates {
                new_claims,
                dropped_claims,
                sequence: new_seq,
            })
        })
    }

    /// Get file locks with full details.
    pub fn get_file_locks(
        &self,
        file_paths: Option<Vec<String>>,
        agent_id: Option<&str>,
        task_id: Option<&str>,
    ) -> Result<HashMap<String, FileLock>> {
        self.with_conn(|conn| {
            let locks = if let Some(paths) = file_paths {
                if paths.is_empty() {
                    return Ok(HashMap::new());
                }

                let placeholders: Vec<String> = paths.iter().map(|_| "?".to_string()).collect();
                let sql = format!(
                    "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE file_path IN ({})",
                    placeholders.join(", ")
                );

                let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
                for path in &paths {
                    params_vec.push(Box::new(path.clone()));
                }

                let params_refs: Vec<&dyn rusqlite::ToSql> =
                    params_vec.iter().map(|b| b.as_ref()).collect();

                let mut stmt = conn.prepare(&sql)?;
                stmt.query_map(params_refs.as_slice(), |row| {
                    let file_path: String = row.get(0)?;
                    Ok((file_path.clone(), FileLock {
                        file_path,
                        worker_id: row.get(1)?,
                        reason: row.get(2)?,
                        locked_at: row.get(3)?,
                        task_id: row.get(4)?,
                    }))
                })?
                .filter_map(|r| r.ok())
                .collect()
            } else if let Some(aid) = agent_id {
                let mut stmt = conn.prepare(
                    "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE worker_id = ?1",
                )?;
                stmt.query_map(params![aid], |row| {
                    let file_path: String = row.get(0)?;
                    Ok((file_path.clone(), FileLock {
                        file_path,
                        worker_id: row.get(1)?,
                        reason: row.get(2)?,
                        locked_at: row.get(3)?,
                        task_id: row.get(4)?,
                    }))
                })?
                .filter_map(|r| r.ok())
                .collect()
            } else if let Some(tid) = task_id {
                let mut stmt = conn.prepare(
                    "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE task_id = ?1",
                )?;
                stmt.query_map(params![tid], |row| {
                    let file_path: String = row.get(0)?;
                    Ok((file_path.clone(), FileLock {
                        file_path,
                        worker_id: row.get(1)?,
                        reason: row.get(2)?,
                        locked_at: row.get(3)?,
                        task_id: row.get(4)?,
                    }))
                })?
                .filter_map(|r| r.ok())
                .collect()
            } else {
                // Return empty - we now require at least one filter
                HashMap::new()
            };

            Ok(locks)
        })
    }

    /// Get all file locks as FileLock objects.
    pub fn get_all_file_locks(&self) -> Result<Vec<FileLock>> {
        self.with_conn(|conn| {
            let mut stmt = conn.prepare(
                "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks",
            )?;

            let locks = stmt
                .query_map([], |row| {
                    let file_path: String = row.get(0)?;
                    let worker_id: String = row.get(1)?;
                    let reason: Option<String> = row.get(2)?;
                    let locked_at: i64 = row.get(3)?;
                    let task_id: Option<String> = row.get(4)?;
                    Ok(FileLock {
                        file_path,
                        worker_id,
                        reason,
                        locked_at,
                        task_id,
                    })
                })?
                .filter_map(|r| r.ok())
                .collect();

            Ok(locks)
        })
    }

    /// Release all locks held by a worker.
    pub fn release_worker_locks(&self, worker_id: &str) -> Result<i32> {
        let now = now_ms();

        self.with_conn(|conn| {
            // Close any open claims for this worker
            conn.execute(
                "UPDATE claim_sequence SET end_timestamp = ?1
                 WHERE worker_id = ?2 AND end_timestamp IS NULL",
                params![now, worker_id],
            )?;

            let deleted = conn.execute(
                "DELETE FROM file_locks WHERE worker_id = ?1",
                params![worker_id],
            )?;

            Ok(deleted as i32)
        })
    }

    /// Release all locks associated with a task.
    /// Called automatically when a task completes.
    pub fn release_task_locks(&self, task_id: &str) -> Result<i32> {
        let now = now_ms();

        self.with_conn(|conn| {
            // Get files locked by this task before deleting
            let files_to_release: Vec<(String, String)> = {
                let mut stmt =
                    conn.prepare("SELECT file_path, worker_id FROM file_locks WHERE task_id = ?1")?;
                stmt.query_map(params![task_id], |row| {
                    Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
                })?
                .filter_map(|r| r.ok())
                .collect()
            };

            // Close any open claims for these files
            for (file_path, worker_id) in &files_to_release {
                conn.execute(
                    "UPDATE claim_sequence SET end_timestamp = ?1
                     WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
                    params![now, file_path, worker_id],
                )?;

                // Record release event
                conn.execute(
                    "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
                     VALUES (?1, ?2, 'released', 'task completed', ?3)",
                    params![file_path, worker_id, now],
                )?;
            }

            let deleted = conn.execute(
                "DELETE FROM file_locks WHERE task_id = ?1",
                params![task_id],
            )?;

            Ok(deleted as i32)
        })
    }
}