1use crate::{
2 config::AppConfig,
3 database::{Database, History, QueueItem},
4 notification::notification,
5 utils, *,
6};
7use anyhow::{Context, Result};
8use chrono::Utc;
9use ssh2::Session;
10use std::{fs::File, io::BufReader, net::TcpStream, path::Path, sync::Arc, time::Duration};
11use tokio::{sync::mpsc, time};
12
13#[derive(Debug)]
14pub struct SftpManager {
15 config: Arc<AppConfig>,
16 database: Arc<Database>,
17}
18
19
20impl SftpManager {
21 pub fn new(
22 config: Arc<AppConfig>,
23 database: Arc<Database>,
24 ) -> (Self, mpsc::UnboundedReceiver<()>) {
25 let (_tx, rx) = mpsc::unbounded_channel();
26 (
27 SftpManager {
28 config,
29 database,
30 },
31 rx,
32 )
33 }
34
35
36 pub async fn start(self: Arc<Self>) {
37 let mut interval =
38 time::interval(Duration::from_millis(self.config.fs_check_interval));
39
40 info!(
41 "Starting SFTP queue processor with check interval: {}ms",
42 self.config.fs_check_interval
43 );
44
45 loop {
46 interval.tick().await;
47 if let Err(e) = self.process_queue().await {
48 error!("Error processing queue: {e:?}");
49 }
50 }
51 }
52
53
54 async fn process_queue(&self) -> Result<()> {
55 let queue = self.database.get_queue()?;
56
57 if queue.is_empty() {
58 return Ok(());
59 }
60
61 self.build_clipboard(&queue)?;
63
64 for item in queue {
66 if let Err(e) = self.process_element(&item).await {
67 error!("Error processing queue element: {e:?}");
68 }
69 }
70
71 Ok(())
72 }
73
74
75 fn build_clipboard(&self, queue: &[QueueItem]) -> Result<()> {
76 let links: Vec<String> = queue
77 .iter()
78 .map(|item| {
79 let config = self
80 .config
81 .select_config()
82 .expect("One of configs should always be selected!");
83 format!(
84 "{}{}{}",
85 config.address,
86 item.uuid,
87 file_extension(&item.local_file)
88 )
89 })
90 .collect();
91
92 let content = links.join(", ");
93 put_to_clipboard(&content)?;
94
95 if self.config.notifications.clipboard {
96 let _ = notification(
97 "Link copied to clipboard",
98 "clipboard",
99 &self.config.notifications,
100 &self.config.sounds,
101 );
102 }
103
104 Ok(())
105 }
106
107
108 async fn process_element(&self, item: &QueueItem) -> Result<()> {
109 let path = Path::new(&item.local_file);
110
111 if !path.exists() || !path.is_file() {
113 debug!(
114 "Local file not found or not a regular file: {}. Skipping.",
115 item.local_file
116 );
117 self.database.remove_from_queue(&item.uuid)?;
118 return Ok(());
119 }
120
121 let temp_pattern = regex::Regex::new(r".*-[a-zA-Z]{4,}$")?;
123 if temp_pattern.is_match(&item.local_file) {
124 debug!("File matches temp pattern, skipping: {}", item.local_file);
125 self.database.remove_from_queue(&item.uuid)?;
126 return Ok(());
127 }
128
129 let remote_file = format!("{}{}", item.remote_file, file_extension(&item.local_file));
131 self.send_file(&item.local_file, &remote_file).await?;
132
133 self.add_to_history(item)?;
135
136 self.database.remove_from_queue(&item.uuid)?;
138
139 Ok(())
140 }
141
142
143 async fn send_file(&self, local_file: &str, remote_file: &str) -> Result<()> {
144 let config = &self
145 .config
146 .select_config()
147 .expect("One of configs should always be selected!");
148
149 let tcp = TcpStream::connect(format!("{}:{}", config.hostname, config.ssh_port))
151 .context("Failed to connect to SSH server")?;
152 tcp.set_read_timeout(Some(Duration::from_millis(
153 self.config.ssh_connection_timeout,
154 )))?;
155 tcp.set_write_timeout(Some(Duration::from_millis(
156 self.config.ssh_connection_timeout,
157 )))?;
158
159 let mut sess = Session::new()?;
160 sess.set_tcp_stream(tcp);
161 sess.handshake()?;
162
163 let ssh_private_key = if config.ssh_key.is_empty() {
165 ".ssh/id_ed25519"
166 } else {
167 &config.ssh_key
168 };
169
170 sess.userauth_pubkey_file(
171 &config.username,
172 None,
173 Path::new(&home::home_dir().expect("Home dir has to be set!"))
174 .join(ssh_private_key)
175 .as_path(),
176 if config.ssh_key_pass.is_empty() {
177 None
178 } else {
179 Some(&config.ssh_key_pass)
180 },
181 )?;
182
183 if !sess.authenticated() {
184 anyhow::bail!("SSH authentication failed");
185 }
186
187 debug!("SSH connection established");
188
189 let sftp = sess.sftp()?;
191 debug!("SFTP session started");
192
193 let local_size = local_file_size(local_file)?;
195 let remote_size = sftp
196 .stat(Path::new(remote_file))
197 .map(|stat| stat.size.unwrap_or(0))
198 .unwrap_or(0);
199
200 debug!(
201 "Local file: {local_file} ({local_size}); Remote file: {remote_file} ({remote_size})"
202 );
203
204 if remote_size > 0 && remote_size == local_size {
205 info!("Found file of same size already uploaded. Skipping");
206 return Ok(());
207 }
208
209 let mut local = BufReader::new(File::open(local_file)?);
211 let mut remote = sftp.create(Path::new(remote_file))?;
212
213 stream_file_to_remote(
214 &mut local,
215 &mut remote,
216 self.config.sftp_buffer_size,
217 local_size,
218 )?;
219
220 if self.config.notifications.upload {
221 let _ = notification(
222 "Uploaded successfully.",
223 "upload",
224 &self.config.notifications,
225 &self.config.sounds,
226 );
227 }
228 Ok(())
229 }
230
231
232 fn add_to_history(&self, queue_item: &QueueItem) -> Result<()> {
233 let config = self
234 .config
235 .select_config()
236 .expect("One of configs should always be selected!");
237 let content = format!(
238 "{}{}{}",
239 config.address,
240 queue_item.uuid,
241 file_extension(&queue_item.local_file)
242 );
243
244 let history = self.database.get_history(None)?;
246 let exists = history.iter().any(|h| h.content.contains(&content));
247
248 if !exists {
249 let history_item = History {
250 content,
251 timestamp: Utc::now().timestamp(),
252 file: queue_item.local_file.clone(),
253 uuid: uuid::Uuid::new_v4().to_string(),
254 };
255 self.database.add_history(&history_item)?;
256 }
257
258 Ok(())
259 }
260}