1use crate::error::{Contextable, Error, Result};
2use crate::util::{handle_fs_err, handle_fs_res};
3use fd_lock::{RwLock, RwLockWriteGuard};
4use std::fs::File;
5use std::io::Seek;
6use std::io::{Read, Write};
7use std::path::{Path, PathBuf};
8
9#[derive(Debug)]
10struct ProcessInfoWrite<'a> {
11 pid: u32,
12 script_id: i64,
13 script_name: &'a str,
14 args: &'a [String], }
16
17#[derive(Debug)]
18pub struct ProcessInfoRead {
19 raw_file_content: String,
20 file_content_start: usize,
21
22 pub pid: u32,
24 pub script_id: i64,
25}
26impl ProcessInfoRead {
27 fn new(raw_file_content: String) -> Result<ProcessInfoRead> {
28 log::debug!("處理進程資訊:{:?}", raw_file_content);
29
30 let new_line = raw_file_content
31 .find('\n')
32 .ok_or_else(|| Error::msg("can't find new line"))?;
33 let (pid, script_id) = raw_file_content[..new_line]
34 .split_once(' ')
35 .ok_or_else(|| Error::msg("can't find space"))?;
36
37 let pid = pid.parse()?;
38 let script_id = script_id.parse()?;
39
40 Ok(ProcessInfoRead {
41 script_id,
42 pid,
43 raw_file_content,
44 file_content_start: new_line + 1,
45 })
46 }
47 pub fn file_content(&self) -> &'_ str {
48 &self.raw_file_content[self.file_content_start..]
49 }
50}
51
52pub struct ProcessLockWrite<'a> {
53 core: ProcessLockCore,
54 process: ProcessInfoWrite<'a>,
55}
56
57pub struct ProcessLockCore {
58 run_id: i64,
59 lock: RwLock<File>,
60 pub path: PathBuf,
61}
62
63fn try_write<'a>(
64 lock: &'a mut RwLock<File>,
65 path: &PathBuf,
66) -> Result<Option<RwLockWriteGuard<'a, File>>> {
67 match lock.try_write() {
68 Ok(guard) => Ok(Some(guard)),
69 Err(err) => match err.kind() {
70 std::io::ErrorKind::WouldBlock => Ok(None),
71 _ => Err(handle_fs_err(&[&*path], err)),
72 },
73 }
74}
75impl ProcessLockCore {
76 pub fn get_can_write(&mut self) -> Result<bool> {
77 Ok(try_write(&mut self.lock, &self.path)?.is_some())
78 }
79 pub fn build(mut self) -> Result<ProcessLockRead> {
80 let mut file = self.lock.into_inner();
81 let mut content = String::new();
82 handle_fs_res(&[&self.path], file.read_to_string(&mut content)).context("讀取檔案失敗")?;
83 self.lock = RwLock::new(file);
84
85 let process = ProcessInfoRead::new(content)?;
86 Ok(ProcessLockRead {
87 core: self,
88 process,
89 })
90 }
91}
92
93impl<'a> ProcessLockWrite<'a> {
94 pub fn new(
95 run_id: i64,
96 script_id: i64,
97 script_name: &'a str,
98 args: &'a [String],
99 ) -> Result<Self> {
100 let path = crate::path::get_process_lock(run_id)?;
101 let file = handle_fs_res(&[&path], File::create(&path))?;
102
103 let process = ProcessInfoWrite {
104 pid: std::process::id(),
105 script_id,
106 script_name,
107 args,
108 };
109
110 Ok(ProcessLockWrite {
111 core: ProcessLockCore {
112 lock: RwLock::new(file),
113 run_id,
114 path,
115 },
116 process,
117 })
118 }
119 pub fn try_write_info(&mut self) -> Result<Option<RwLockWriteGuard<'_, File>>> {
120 let mut guard_opt = try_write(&mut self.core.lock, &self.core.path)?;
121 if let Some(guard) = guard_opt.as_mut() {
122 write!(
123 guard,
124 "{} {}\n{}",
125 self.process.pid, self.process.script_id, self.process.script_name
126 )?;
127 for arg in self.process.args.iter() {
128 write!(guard, " {}", arg)?;
129 }
130 return Ok(guard_opt);
131 }
132
133 log::warn!("{:?} 竟然被其它人鎖住了…?", self.core.path);
134 Ok(None)
135 }
136 pub fn mark_sucess(guard: Option<RwLockWriteGuard<'_, File>>) {
137 if let Some(guard) = guard {
138 if let Err(err) = guard.set_len(0) {
139 log::warn!("Failed to mark file lock as success: {}", err)
140 }
141 }
142 }
143 pub fn get_path(&self) -> &Path {
144 &self.core.path
145 }
146}
147
148pub struct ProcessLockRead {
149 core: ProcessLockCore,
150 pub process: ProcessInfoRead,
151}
152impl ProcessLockRead {
153 pub fn get_run_id(&self) -> i64 {
154 self.core.run_id
155 }
156 pub fn builder(path: PathBuf, file_name: &str) -> Result<ProcessLockCore> {
157 let file = handle_fs_res(&[&path], File::open(&path))?;
158 let run_id = file_name.parse()?;
159
160 Ok(ProcessLockCore {
161 lock: RwLock::new(file),
162 run_id,
163 path,
164 })
165 }
166 pub fn wait_write(mut self) -> Result {
167 let g = self.core.lock.write()?;
168 drop(g);
169
170 let mut file = self.core.lock.into_inner();
171 file.rewind()?;
172 if file.bytes().next().is_none() {
173 Ok(())
174 } else {
175 log::warn!("檔案鎖內容沒被砍掉代表進程沒有正常結束…");
176 Err(Error::ScriptError(1))
177 }
178 }
179}
180
181#[cfg(test)]
182mod test {
183 use super::*;
184 const SCRIPT_NAME: &str = "this-name";
185 #[test]
186 fn test_process_lock() {
187 const RUN_ID: i64 = 1;
188 const SCRIPT_ID: i64 = 2;
189 let file_path = crate::path::get_process_lock(RUN_ID).unwrap();
190
191 let mut write_lock = ProcessLockWrite::new(RUN_ID, SCRIPT_ID, SCRIPT_NAME, &[]).unwrap();
192 let mut read_core =
193 ProcessLockRead::builder(file_path.clone(), &RUN_ID.to_string()).unwrap();
194
195 assert!(read_core.get_can_write().unwrap());
196
197 let write_guard = write_lock.try_write_info().unwrap();
198
199 assert!(!read_core.get_can_write().unwrap());
200
201 let mut read_lock = read_core.build().unwrap();
202 let ProcessLockRead {
203 core:
204 ProcessLockCore {
205 run_id,
206 path,
207 lock: _,
208 },
209 process:
210 ProcessInfoRead {
211 pid,
212 script_id,
213 raw_file_content: _,
214 file_content_start: _,
215 },
216 } = &read_lock;
217 assert_eq!(RUN_ID, *run_id);
218 assert_eq!(&file_path, path);
219 assert_eq!(std::process::id(), *pid);
220 assert_eq!(SCRIPT_ID, *script_id);
221 assert!(read_lock.process.file_content().starts_with(SCRIPT_NAME));
222
223 assert!(!read_lock.core.get_can_write().unwrap());
224 drop(write_guard);
225 assert!(read_lock.core.get_can_write().unwrap());
226 }
227 #[test]
228 fn test_process_success() {
229 const RUN_ID: i64 = 11;
230 const SCRIPT_ID: i64 = 22;
231 let file_path = crate::path::get_process_lock(RUN_ID).unwrap();
232
233 let mut write_lock = ProcessLockWrite::new(RUN_ID, SCRIPT_ID, SCRIPT_NAME, &[]).unwrap();
234 let new_read_lock = || {
235 let read_core =
236 ProcessLockRead::builder(file_path.clone(), &RUN_ID.to_string()).unwrap();
237 read_core.build().unwrap()
238 };
239
240 let write_guard = write_lock.try_write_info().unwrap();
241 drop(write_guard);
242
243 let read_lock = new_read_lock();
244 let res = read_lock.wait_write();
245 assert!(matches!(res, Err(Error::ScriptError(_))));
246
247 let read_lock = new_read_lock();
248 let write_guard = write_lock.try_write_info().unwrap();
249 ProcessLockWrite::mark_sucess(write_guard);
250
251 read_lock.wait_write().expect("應該要成功");
252 }
253}