1use super::{now_ms, Database};
4use crate::types::{ClaimEvent, ClaimEventType, ClaimUpdates, FileLock};
5use anyhow::Result;
6use rusqlite::params;
7use std::collections::{HashMap, HashSet};
8
9impl Database {
10 pub fn lock_file(&self, file_path: String, worker_id: &str, reason: Option<String>, task_id: Option<String>) -> Result<Option<String>> {
13 let now = now_ms();
14
15 self.with_conn_mut(|conn| {
16 let tx = conn.transaction()?;
17 let existing: Option<String> = tx
19 .query_row(
20 "SELECT worker_id FROM file_locks WHERE file_path = ?1",
21 params![&file_path],
22 |row| row.get(0),
23 )
24 .ok();
25
26 let result = if let Some(existing_worker) = existing {
27 if existing_worker != worker_id {
28 Some(existing_worker)
30 } else {
31 tx.execute(
33 "UPDATE file_locks SET locked_at = ?1, reason = ?2, task_id = ?3 WHERE file_path = ?4",
34 params![now, &reason, &task_id, &file_path],
35 )?;
36 None
37 }
38 } else {
39 tx.execute(
41 "INSERT INTO file_locks (file_path, worker_id, reason, locked_at, task_id) VALUES (?1, ?2, ?3, ?4, ?5)",
42 params![&file_path, worker_id, &reason, now, &task_id],
43 )?;
44
45 tx.execute(
47 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp) VALUES (?1, ?2, 'claimed', ?3, ?4)",
48 params![&file_path, worker_id, &reason, now],
49 )?;
50 None
51 };
52
53 tx.commit()?;
54 Ok(result)
55 })
56 }
57
58 pub fn unlock_file(&self, file_path: &str, worker_id: &str, reason: Option<String>) -> Result<bool> {
60 let now = now_ms();
61
62 self.with_conn_mut(|conn| {
63 let tx = conn.transaction()?;
64
65 let deleted = tx.execute(
66 "DELETE FROM file_locks WHERE file_path = ?1 AND worker_id = ?2",
67 params![file_path, worker_id],
68 )?;
69
70 if deleted > 0 {
71 let claim_id: Option<i64> = tx.query_row(
73 "SELECT MAX(id) FROM claim_sequence
74 WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
75 params![file_path, worker_id],
76 |row| row.get(0),
77 ).ok().flatten();
78
79 tx.execute(
81 "UPDATE claim_sequence SET end_timestamp = ?1
82 WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
83 params![now, file_path, worker_id],
84 )?;
85
86 tx.execute(
88 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
89 VALUES (?1, ?2, 'released', ?3, ?4, ?5)",
90 params![file_path, worker_id, &reason, now, claim_id],
91 )?;
92 }
93
94 tx.commit()?;
95 Ok(deleted > 0)
96 })
97 }
98
99 pub fn unlock_files_verbose(
102 &self,
103 file_paths: Vec<String>,
104 worker_id: &str,
105 reason: Option<String>,
106 ) -> Result<Vec<(String, String)>> {
107 let now = now_ms();
108 let mut released = Vec::new();
109
110 self.with_conn_mut(|conn| {
111 let tx = conn.transaction()?;
112
113 for file_path in file_paths {
114 let deleted = tx.execute(
115 "DELETE FROM file_locks WHERE file_path = ?1 AND worker_id = ?2",
116 params![&file_path, worker_id],
117 )?;
118
119 if deleted > 0 {
120 let claim_id: Option<i64> = tx.query_row(
122 "SELECT MAX(id) FROM claim_sequence
123 WHERE file_path = ?1 AND worker_id = ?2 AND event = 'claimed'",
124 params![&file_path, worker_id],
125 |row| row.get(0),
126 ).ok().flatten();
127
128 tx.execute(
130 "UPDATE claim_sequence SET end_timestamp = ?1
131 WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
132 params![now, &file_path, worker_id],
133 )?;
134
135 tx.execute(
137 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp, claim_id)
138 VALUES (?1, ?2, 'released', ?3, ?4, ?5)",
139 params![&file_path, worker_id, &reason, now, claim_id],
140 )?;
141
142 released.push((file_path, worker_id.to_string()));
143 }
144 }
145
146 tx.commit()?;
147 Ok(released)
148 })
149 }
150
151 pub fn release_worker_locks_verbose(&self, worker_id: &str, reason: Option<String>) -> Result<Vec<(String, String)>> {
154 let now = now_ms();
155
156 self.with_conn_mut(|conn| {
157 let tx = conn.transaction()?;
158
159 let files_to_release: Vec<String> = {
161 let mut stmt = tx.prepare(
162 "SELECT file_path FROM file_locks WHERE worker_id = ?1"
163 )?;
164 stmt.query_map(params![worker_id], |row| row.get::<_, String>(0))?
165 .filter_map(|r| r.ok())
166 .collect()
167 };
168
169 if files_to_release.is_empty() {
170 tx.commit()?;
171 return Ok(Vec::new());
172 }
173
174 tx.execute(
176 "UPDATE claim_sequence SET end_timestamp = ?1
177 WHERE worker_id = ?2 AND end_timestamp IS NULL",
178 params![now, worker_id],
179 )?;
180
181 for file_path in &files_to_release {
183 tx.execute(
184 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
185 VALUES (?1, ?2, 'released', ?3, ?4)",
186 params![file_path, worker_id, &reason, now],
187 )?;
188 }
189
190 tx.execute(
192 "DELETE FROM file_locks WHERE worker_id = ?1",
193 params![worker_id],
194 )?;
195
196 tx.commit()?;
197
198 let released: Vec<(String, String)> = files_to_release
199 .into_iter()
200 .map(|f| (f, worker_id.to_string()))
201 .collect();
202
203 Ok(released)
204 })
205 }
206
207 pub fn release_task_locks_verbose(&self, task_id: &str, reason: Option<String>) -> Result<Vec<(String, String)>> {
210 let now = now_ms();
211
212 self.with_conn_mut(|conn| {
213 let tx = conn.transaction()?;
214
215 let files_to_release: Vec<(String, String)> = {
217 let mut stmt = tx.prepare(
218 "SELECT file_path, worker_id FROM file_locks WHERE task_id = ?1"
219 )?;
220 stmt.query_map(params![task_id], |row| {
221 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
222 })?
223 .filter_map(|r| r.ok())
224 .collect()
225 };
226
227 if files_to_release.is_empty() {
228 tx.commit()?;
229 return Ok(Vec::new());
230 }
231
232 for (file_path, worker_id) in &files_to_release {
234 tx.execute(
235 "UPDATE claim_sequence SET end_timestamp = ?1
236 WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
237 params![now, file_path, worker_id],
238 )?;
239
240 let reason_str = reason.as_deref().unwrap_or("task release");
242 tx.execute(
243 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
244 VALUES (?1, ?2, 'released', ?3, ?4)",
245 params![file_path, worker_id, reason_str, now],
246 )?;
247 }
248
249 tx.execute(
251 "DELETE FROM file_locks WHERE task_id = ?1",
252 params![task_id],
253 )?;
254
255 tx.commit()?;
256 Ok(files_to_release)
257 })
258 }
259
260 pub fn claim_updates(&self, worker_id: &str) -> Result<ClaimUpdates> {
263 self.with_conn(|conn| {
264 let last_seq: i64 = conn
266 .query_row(
267 "SELECT last_claim_sequence FROM workers WHERE id = ?1",
268 params![worker_id],
269 |row| row.get(0),
270 )
271 .unwrap_or(0);
272
273 let mut stmt = conn.prepare(
276 "SELECT id, file_path, worker_id, event, reason, timestamp, end_timestamp, claim_id
277 FROM claim_sequence
278 WHERE id >= ?1
279 ORDER BY id"
280 )?;
281 let events: Vec<ClaimEvent> = stmt.query_map(params![last_seq], |row| {
282 Ok(ClaimEvent {
283 id: row.get(0)?,
284 file_path: row.get(1)?,
285 worker_id: row.get(2)?,
286 event: ClaimEventType::from_str(&row.get::<_, String>(3)?).unwrap_or(ClaimEventType::Claimed),
287 reason: row.get(4)?,
288 timestamp: row.get(5)?,
289 end_timestamp: row.get(6)?,
290 claim_id: row.get(7)?,
291 })
292 })?
293 .filter_map(|r| r.ok())
294 .collect();
295
296 let max_seen = events.iter().map(|e| e.id).max();
299 let new_seq = match max_seen {
300 Some(max) => max + 1, None => last_seq, };
303
304 if new_seq > last_seq {
306 conn.execute(
307 "UPDATE workers SET last_claim_sequence = ?1 WHERE id = ?2",
308 params![new_seq, worker_id],
309 )?;
310 }
311
312 let new_claims: Vec<ClaimEvent> = events.iter()
314 .filter(|e| e.event == ClaimEventType::Claimed)
315 .cloned()
316 .collect();
317
318 let new_claim_ids: HashSet<i64> = new_claims.iter()
322 .map(|c| c.id)
323 .collect();
324
325 let dropped_claims: Vec<ClaimEvent> = events.iter()
326 .filter(|e| e.event == ClaimEventType::Released)
327 .filter(|release| {
328 match release.claim_id {
329 Some(cid) => cid < last_seq || new_claim_ids.contains(&cid),
330 None => true, }
332 })
333 .cloned()
334 .collect();
335
336 Ok(ClaimUpdates {
337 new_claims,
338 dropped_claims,
339 sequence: new_seq,
340 })
341 })
342 }
343
344 pub fn get_file_locks(
346 &self,
347 file_paths: Option<Vec<String>>,
348 agent_id: Option<&str>,
349 task_id: Option<&str>,
350 ) -> Result<HashMap<String, FileLock>> {
351 self.with_conn(|conn| {
352 let locks = if let Some(paths) = file_paths {
353 if paths.is_empty() {
354 return Ok(HashMap::new());
355 }
356
357 let placeholders: Vec<String> = paths.iter().map(|_| "?".to_string()).collect();
358 let sql = format!(
359 "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE file_path IN ({})",
360 placeholders.join(", ")
361 );
362
363 let mut params_vec: Vec<Box<dyn rusqlite::ToSql>> = Vec::new();
364 for path in &paths {
365 params_vec.push(Box::new(path.clone()));
366 }
367
368 let params_refs: Vec<&dyn rusqlite::ToSql> =
369 params_vec.iter().map(|b| b.as_ref()).collect();
370
371 let mut stmt = conn.prepare(&sql)?;
372 stmt.query_map(params_refs.as_slice(), |row| {
373 let file_path: String = row.get(0)?;
374 Ok((file_path.clone(), FileLock {
375 file_path,
376 worker_id: row.get(1)?,
377 reason: row.get(2)?,
378 locked_at: row.get(3)?,
379 task_id: row.get(4)?,
380 }))
381 })?
382 .filter_map(|r| r.ok())
383 .collect()
384 } else if let Some(aid) = agent_id {
385 let mut stmt = conn.prepare(
386 "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE worker_id = ?1",
387 )?;
388 stmt.query_map(params![aid], |row| {
389 let file_path: String = row.get(0)?;
390 Ok((file_path.clone(), FileLock {
391 file_path,
392 worker_id: row.get(1)?,
393 reason: row.get(2)?,
394 locked_at: row.get(3)?,
395 task_id: row.get(4)?,
396 }))
397 })?
398 .filter_map(|r| r.ok())
399 .collect()
400 } else if let Some(tid) = task_id {
401 let mut stmt = conn.prepare(
402 "SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks WHERE task_id = ?1",
403 )?;
404 stmt.query_map(params![tid], |row| {
405 let file_path: String = row.get(0)?;
406 Ok((file_path.clone(), FileLock {
407 file_path,
408 worker_id: row.get(1)?,
409 reason: row.get(2)?,
410 locked_at: row.get(3)?,
411 task_id: row.get(4)?,
412 }))
413 })?
414 .filter_map(|r| r.ok())
415 .collect()
416 } else {
417 HashMap::new()
419 };
420
421 Ok(locks)
422 })
423 }
424
425 pub fn get_all_file_locks(&self) -> Result<Vec<FileLock>> {
427 self.with_conn(|conn| {
428 let mut stmt =
429 conn.prepare("SELECT file_path, worker_id, reason, locked_at, task_id FROM file_locks")?;
430
431 let locks = stmt
432 .query_map([], |row| {
433 let file_path: String = row.get(0)?;
434 let worker_id: String = row.get(1)?;
435 let reason: Option<String> = row.get(2)?;
436 let locked_at: i64 = row.get(3)?;
437 let task_id: Option<String> = row.get(4)?;
438 Ok(FileLock {
439 file_path,
440 worker_id,
441 reason,
442 locked_at,
443 task_id,
444 })
445 })?
446 .filter_map(|r| r.ok())
447 .collect();
448
449 Ok(locks)
450 })
451 }
452
453 pub fn release_worker_locks(&self, worker_id: &str) -> Result<i32> {
455 let now = now_ms();
456
457 self.with_conn(|conn| {
458 conn.execute(
460 "UPDATE claim_sequence SET end_timestamp = ?1
461 WHERE worker_id = ?2 AND end_timestamp IS NULL",
462 params![now, worker_id],
463 )?;
464
465 let deleted = conn.execute(
466 "DELETE FROM file_locks WHERE worker_id = ?1",
467 params![worker_id],
468 )?;
469
470 Ok(deleted as i32)
471 })
472 }
473
474
475 pub fn release_task_locks(&self, task_id: &str) -> Result<i32> {
478 let now = now_ms();
479
480 self.with_conn(|conn| {
481 let files_to_release: Vec<(String, String)> = {
483 let mut stmt = conn.prepare(
484 "SELECT file_path, worker_id FROM file_locks WHERE task_id = ?1"
485 )?;
486 stmt.query_map(params![task_id], |row| {
487 Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
488 })?
489 .filter_map(|r| r.ok())
490 .collect()
491 };
492
493 for (file_path, worker_id) in &files_to_release {
495 conn.execute(
496 "UPDATE claim_sequence SET end_timestamp = ?1
497 WHERE file_path = ?2 AND worker_id = ?3 AND end_timestamp IS NULL",
498 params![now, file_path, worker_id],
499 )?;
500
501 conn.execute(
503 "INSERT INTO claim_sequence (file_path, worker_id, event, reason, timestamp)
504 VALUES (?1, ?2, 'released', 'task completed', ?3)",
505 params![file_path, worker_id, now],
506 )?;
507 }
508
509 let deleted = conn.execute(
510 "DELETE FROM file_locks WHERE task_id = ?1",
511 params![task_id],
512 )?;
513
514 Ok(deleted as i32)
515 })
516 }
517}