1use crate::{
2 config::AppConfig,
3 database::{Database, History, QueueItem},
4 notification::notification,
5 utils, *,
6};
7use anyhow::{Context, Result};
8use ssh2::Session;
9use std::{fs::File, io::BufReader, net::TcpStream, path::Path, sync::Arc, time::Duration};
10use tokio::{sync::mpsc, time};
11
12#[derive(Debug)]
13pub struct SftpManager {
14 config: Arc<AppConfig>,
15 database: Arc<Database>,
16}
17
18
19impl SftpManager {
20 pub fn new(
21 config: Arc<AppConfig>,
22 database: Arc<Database>,
23 ) -> (Self, mpsc::UnboundedReceiver<()>) {
24 let (_tx, rx) = mpsc::unbounded_channel();
25 (
26 SftpManager {
27 config,
28 database,
29 },
30 rx,
31 )
32 }
33
34
35 pub async fn start(self: Arc<Self>) {
36 let mut interval =
37 time::interval(Duration::from_millis(self.config.fs_check_interval));
38
39 info!(
40 "Starting SFTP queue processor with check interval: {}ms",
41 self.config.fs_check_interval
42 );
43
44 loop {
45 interval.tick().await;
46 if let Err(e) = self.process_queue().await {
47 error!("Error processing queue: {e:?}");
48 }
49 }
50 }
51
52
53 async fn process_queue(&self) -> Result<()> {
54 let queue = self.database.get_queue()?;
55 if !queue.is_empty() {
56 self.build_clipboard(&queue)?;
58
59 for item in &queue {
61 if let Err(e) = self.process_element(item).await {
62 error!("Error processing queue element: {e:?}");
63 }
64 }
65 }
66 Ok(())
67 }
68
69
70 fn build_clipboard(&self, queue: &[QueueItem]) -> Result<()> {
71 let links: Vec<String> = queue
72 .iter()
73 .map(|item| {
74 let config = self
75 .config
76 .select_config()
77 .expect("One of configs should always be selected!");
78 format!(
79 "{}{}{}",
80 config.address,
81 item.uuid,
82 file_extension(&item.local_file)
83 )
84 })
85 .collect();
86
87 let content = links.join(", ");
88 put_to_clipboard(&content)?;
89
90 if self.config.notifications.clipboard {
91 let _ = notification(
92 "Link copied to clipboard",
93 "clipboard",
94 &self.config.notifications,
95 &self.config.sounds,
96 );
97 }
98
99 Ok(())
100 }
101
102
103 async fn process_element(&self, item: &QueueItem) -> Result<()> {
104 let path = Path::new(&item.local_file);
105
106 if !path.exists() || !path.is_file() {
108 debug!(
109 "Local file not found or not a regular file: {}. Skipping.",
110 item.local_file
111 );
112 self.database.remove_from_queue(&item.uuid)?;
113 return Ok(());
114 }
115
116 if TEMP_PATTERN.is_match(&item.local_file) {
118 debug!("File matches temp pattern, skipping: {}", item.local_file);
119 self.database.remove_from_queue(&item.uuid)?;
120 return Ok(());
121 }
122
123 let remote_file = format!("{}{}", item.remote_file, file_extension(&item.local_file));
125 self.send_file(&item.local_file, &remote_file).await?;
126
127 self.add_to_history(item)?;
129
130 self.database.remove_from_queue(&item.uuid)?;
132
133 Ok(())
134 }
135
136
137 async fn send_file(&self, local_file: &str, remote_file: &str) -> Result<()> {
138 let config = &self
139 .config
140 .select_config()
141 .expect("One of configs should always be selected!");
142
143 let tcp = TcpStream::connect(format!("{}:{}", config.hostname, config.ssh_port))
145 .context("Failed to connect to SSH server")?;
146 tcp.set_read_timeout(Some(Duration::from_millis(
147 self.config.ssh_connection_timeout,
148 )))?;
149 tcp.set_write_timeout(Some(Duration::from_millis(
150 self.config.ssh_connection_timeout,
151 )))?;
152
153 let mut sess = Session::new()?;
154 sess.set_tcp_stream(tcp);
155 sess.handshake()?;
156
157 let ssh_private_key = if config.ssh_key.is_empty() {
159 ".ssh/id_ed25519"
160 } else {
161 &config.ssh_key
162 };
163
164 sess.userauth_pubkey_file(
165 &config.username,
166 None,
167 Path::new(&home::home_dir().expect("Home dir has to be set!"))
168 .join(ssh_private_key)
169 .as_path(),
170 if config.ssh_key_pass.is_empty() {
171 None
172 } else {
173 Some(&config.ssh_key_pass)
174 },
175 )?;
176
177 if !sess.authenticated() {
178 anyhow::bail!("SSH authentication failed");
179 }
180
181 debug!("SSH connection established");
182
183 let sftp = sess.sftp()?;
185 debug!("SFTP session started");
186
187 let local_size = local_file_size(local_file)?;
189 let remote_size = sftp
190 .stat(Path::new(remote_file))
191 .map(|stat| stat.size.unwrap_or(0))
192 .unwrap_or(0);
193
194 debug!(
195 "Local file: {local_file} ({local_size}); Remote file: {remote_file} ({remote_size})"
196 );
197
198 if remote_size > 0 && remote_size == local_size {
199 info!("Found file of same size already uploaded. Skipping");
200 return Ok(());
201 }
202
203 let mut local = BufReader::new(File::open(local_file)?);
205 let mut remote = sftp.create(Path::new(remote_file))?;
206
207 stream_file_to_remote(
208 &mut local,
209 &mut remote,
210 self.config.sftp_buffer_size,
211 local_size,
212 )?;
213
214 if self.config.notifications.upload {
215 let _ = notification(
216 "Uploaded successfully.",
217 "upload",
218 &self.config.notifications,
219 &self.config.sounds,
220 );
221 }
222 Ok(())
223 }
224
225
226 fn add_to_history(&self, queue_item: &QueueItem) -> Result<()> {
227 let config = self
228 .config
229 .select_config()
230 .expect("One of configs should always be selected!");
231 let content = format!(
232 "{}{}{}",
233 config.address,
234 queue_item.uuid,
235 file_extension(&queue_item.local_file)
236 );
237
238 let history = self.database.get_history(None)?;
240 let exists = history.iter().any(|h| h.content.contains(&content));
241
242 if !exists {
243 let history_item = History {
244 content,
245 timestamp: chrono::Local::now().timestamp(),
246 file: queue_item.local_file.clone(),
247 uuid: uuid::Uuid::new_v4().to_string(),
248 };
249 self.database.add_history(&history_item)?;
250 }
251
252 Ok(())
253 }
254}