1use crate::{
2 config::AppConfig,
3 database::{Database, History, QueueItem},
4 notification::notification,
5 utils, *,
6};
7use anyhow::{Context, Result};
8use chrono::Utc;
9use ssh2::Session;
10use std::{fs::File, io::BufReader, net::TcpStream, path::Path, sync::Arc, time::Duration};
11use tokio::{sync::mpsc, time};
12
13#[derive(Debug)]
14pub struct SftpManager {
15 config: Arc<AppConfig>,
16 database: Arc<Database>,
17}
18
19
20impl SftpManager {
21 pub fn new(
22 config: Arc<AppConfig>,
23 database: Arc<Database>,
24 ) -> (Self, mpsc::UnboundedReceiver<()>) {
25 let (_tx, rx) = mpsc::unbounded_channel();
26 (
27 SftpManager {
28 config,
29 database,
30 },
31 rx,
32 )
33 }
34
35
36 pub async fn start(self: Arc<Self>) {
37 let mut interval =
38 time::interval(Duration::from_millis(self.config.fs_check_interval));
39
40 info!(
41 "Starting SFTP queue processor with check interval: {}ms",
42 self.config.fs_check_interval
43 );
44
45 loop {
46 interval.tick().await;
47 if let Err(e) = self.process_queue().await {
48 error!("Error processing queue: {e:?}");
49 }
50 }
51 }
52
53
54 async fn process_queue(&self) -> Result<()> {
55 let queue = self.database.get_queue()?;
56 if !queue.is_empty() {
57 self.build_clipboard(&queue)?;
59
60 for item in &queue {
62 if let Err(e) = self.process_element(item).await {
63 error!("Error processing queue element: {e:?}");
64 }
65 }
66 }
67 Ok(())
68 }
69
70
71 fn build_clipboard(&self, queue: &[QueueItem]) -> Result<()> {
72 let links: Vec<String> = queue
73 .iter()
74 .map(|item| {
75 let config = self
76 .config
77 .select_config()
78 .expect("One of configs should always be selected!");
79 format!(
80 "{}{}{}",
81 config.address,
82 item.uuid,
83 file_extension(&item.local_file)
84 )
85 })
86 .collect();
87
88 let content = links.join(", ");
89 put_to_clipboard(&content)?;
90
91 if self.config.notifications.clipboard {
92 let _ = notification(
93 "Link copied to clipboard",
94 "clipboard",
95 &self.config.notifications,
96 &self.config.sounds,
97 );
98 }
99
100 Ok(())
101 }
102
103
104 async fn process_element(&self, item: &QueueItem) -> Result<()> {
105 let path = Path::new(&item.local_file);
106
107 if !path.exists() || !path.is_file() {
109 debug!(
110 "Local file not found or not a regular file: {}. Skipping.",
111 item.local_file
112 );
113 self.database.remove_from_queue(&item.uuid)?;
114 return Ok(());
115 }
116
117 if TEMP_PATTERN.is_match(&item.local_file) {
119 debug!("File matches temp pattern, skipping: {}", item.local_file);
120 self.database.remove_from_queue(&item.uuid)?;
121 return Ok(());
122 }
123
124 let remote_file = format!("{}{}", item.remote_file, file_extension(&item.local_file));
126 self.send_file(&item.local_file, &remote_file).await?;
127
128 self.add_to_history(item)?;
130
131 self.database.remove_from_queue(&item.uuid)?;
133
134 Ok(())
135 }
136
137
138 async fn send_file(&self, local_file: &str, remote_file: &str) -> Result<()> {
139 let config = &self
140 .config
141 .select_config()
142 .expect("One of configs should always be selected!");
143
144 let tcp = TcpStream::connect(format!("{}:{}", config.hostname, config.ssh_port))
146 .context("Failed to connect to SSH server")?;
147 tcp.set_read_timeout(Some(Duration::from_millis(
148 self.config.ssh_connection_timeout,
149 )))?;
150 tcp.set_write_timeout(Some(Duration::from_millis(
151 self.config.ssh_connection_timeout,
152 )))?;
153
154 let mut sess = Session::new()?;
155 sess.set_tcp_stream(tcp);
156 sess.handshake()?;
157
158 let ssh_private_key = if config.ssh_key.is_empty() {
160 ".ssh/id_ed25519"
161 } else {
162 &config.ssh_key
163 };
164
165 sess.userauth_pubkey_file(
166 &config.username,
167 None,
168 Path::new(&home::home_dir().expect("Home dir has to be set!"))
169 .join(ssh_private_key)
170 .as_path(),
171 if config.ssh_key_pass.is_empty() {
172 None
173 } else {
174 Some(&config.ssh_key_pass)
175 },
176 )?;
177
178 if !sess.authenticated() {
179 anyhow::bail!("SSH authentication failed");
180 }
181
182 debug!("SSH connection established");
183
184 let sftp = sess.sftp()?;
186 debug!("SFTP session started");
187
188 let local_size = local_file_size(local_file)?;
190 let remote_size = sftp
191 .stat(Path::new(remote_file))
192 .map(|stat| stat.size.unwrap_or(0))
193 .unwrap_or(0);
194
195 debug!(
196 "Local file: {local_file} ({local_size}); Remote file: {remote_file} ({remote_size})"
197 );
198
199 if remote_size > 0 && remote_size == local_size {
200 info!("Found file of same size already uploaded. Skipping");
201 return Ok(());
202 }
203
204 let mut local = BufReader::new(File::open(local_file)?);
206 let mut remote = sftp.create(Path::new(remote_file))?;
207
208 stream_file_to_remote(
209 &mut local,
210 &mut remote,
211 self.config.sftp_buffer_size,
212 local_size,
213 )?;
214
215 if self.config.notifications.upload {
216 let _ = notification(
217 "Uploaded successfully.",
218 "upload",
219 &self.config.notifications,
220 &self.config.sounds,
221 );
222 }
223 Ok(())
224 }
225
226
227 fn add_to_history(&self, queue_item: &QueueItem) -> Result<()> {
228 let config = self
229 .config
230 .select_config()
231 .expect("One of configs should always be selected!");
232 let content = format!(
233 "{}{}{}",
234 config.address,
235 queue_item.uuid,
236 file_extension(&queue_item.local_file)
237 );
238
239 let history = self.database.get_history(None)?;
241 let exists = history.iter().any(|h| h.content.contains(&content));
242
243 if !exists {
244 let history_item = History {
245 content,
246 timestamp: Utc::now().timestamp(),
247 file: queue_item.local_file.clone(),
248 uuid: uuid::Uuid::new_v4().to_string(),
249 };
250 self.database.add_history(&history_item)?;
251 }
252
253 Ok(())
254 }
255}