1use std::io::Write;
26use std::path::{Path, PathBuf};
27use std::process::{Command, Stdio};
28use std::time::Duration;
29
30use serde::{Deserialize, Serialize};
31
32use crate::messaging_plugin_protocol::{
33 CreateDraftParams, DraftEnvelope, DraftState, DraftStatusParams, FetchParams, FetchedMessage,
34 HealthParams, MessagingPluginError, MessagingPluginRequest, MessagingPluginResponse,
35 MESSAGING_PROTOCOL_VERSION,
36};
37
38#[derive(Debug, Clone, Serialize, Deserialize)]
44pub struct MessagingPluginManifest {
45 pub name: String,
47
48 #[serde(default = "default_version")]
50 pub version: String,
51
52 #[serde(rename = "type", default = "default_type")]
54 pub plugin_type: String,
55
56 pub command: String,
58
59 #[serde(default)]
61 pub args: Vec<String>,
62
63 #[serde(default)]
67 pub capabilities: Vec<String>,
68
69 #[serde(default)]
71 pub description: Option<String>,
72
73 #[serde(default = "default_timeout_secs")]
75 pub timeout_secs: u64,
76
77 #[serde(default = "default_protocol_version")]
79 pub protocol_version: u32,
80}
81
82fn default_version() -> String {
83 "0.1.0".to_string()
84}
85
86fn default_type() -> String {
87 "messaging".to_string()
88}
89
90fn default_timeout_secs() -> u64 {
91 60
92}
93
94fn default_protocol_version() -> u32 {
95 MESSAGING_PROTOCOL_VERSION
96}
97
98impl MessagingPluginManifest {
99 pub fn load(path: &Path) -> Result<Self, MessagingPluginError> {
101 let content = std::fs::read_to_string(path)?;
102 let manifest: Self = toml::from_str(&content).map_err(|e| {
103 MessagingPluginError::Io(std::io::Error::new(
104 std::io::ErrorKind::InvalidData,
105 format!("invalid manifest at {}: {}", path.display(), e),
106 ))
107 })?;
108 Ok(manifest)
109 }
110}
111
112#[derive(Debug, Clone, PartialEq, Eq)]
118pub enum MessagingPluginSource {
119 UserGlobal,
121 ProjectLocal,
123 Path,
125}
126
127impl std::fmt::Display for MessagingPluginSource {
128 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
129 match self {
130 MessagingPluginSource::UserGlobal => write!(f, "global"),
131 MessagingPluginSource::ProjectLocal => write!(f, "project"),
132 MessagingPluginSource::Path => write!(f, "PATH"),
133 }
134 }
135}
136
137#[derive(Debug, Clone)]
139pub struct DiscoveredMessagingPlugin {
140 pub manifest: MessagingPluginManifest,
142 pub plugin_dir: Option<PathBuf>,
144 pub source: MessagingPluginSource,
146}
147
148pub fn discover_messaging_plugins(project_root: &Path) -> Vec<DiscoveredMessagingPlugin> {
157 let mut plugins = Vec::new();
158
159 if let Some(config_dir) = user_config_dir() {
161 let global_dir = config_dir.join("ta").join("plugins").join("messaging");
162 scan_messaging_plugin_dir(&global_dir, MessagingPluginSource::UserGlobal, &mut plugins);
163 }
164
165 let project_dir = project_root.join(".ta").join("plugins").join("messaging");
167 scan_messaging_plugin_dir(
168 &project_dir,
169 MessagingPluginSource::ProjectLocal,
170 &mut plugins,
171 );
172
173 plugins
174}
175
176fn scan_messaging_plugin_dir(
178 dir: &Path,
179 source: MessagingPluginSource,
180 out: &mut Vec<DiscoveredMessagingPlugin>,
181) {
182 if !dir.is_dir() {
183 return;
184 }
185
186 let entries = match std::fs::read_dir(dir) {
187 Ok(e) => e,
188 Err(e) => {
189 tracing::warn!(
190 dir = %dir.display(),
191 error = %e,
192 "Failed to read messaging plugin directory"
193 );
194 return;
195 }
196 };
197
198 for entry in entries.flatten() {
199 let path = entry.path();
200 if !path.is_dir() {
201 continue;
202 }
203
204 let manifest_path = path.join("plugin.toml");
205 if !manifest_path.exists() {
206 continue;
207 }
208
209 match MessagingPluginManifest::load(&manifest_path) {
210 Ok(manifest) => {
211 tracing::debug!(
212 plugin = %manifest.name,
213 source = %source,
214 "Discovered messaging plugin"
215 );
216 out.push(DiscoveredMessagingPlugin {
217 manifest,
218 plugin_dir: Some(path),
219 source: source.clone(),
220 });
221 }
222 Err(e) => {
223 tracing::warn!(
224 path = %manifest_path.display(),
225 error = %e,
226 "Skipping invalid messaging plugin manifest"
227 );
228 }
229 }
230 }
231}
232
233pub fn find_messaging_plugin(
238 provider: &str,
239 project_root: &Path,
240) -> Option<DiscoveredMessagingPlugin> {
241 let all = discover_messaging_plugins(project_root);
243 if let Some(p) = all.into_iter().find(|p| p.manifest.name == provider) {
244 return Some(p);
245 }
246
247 let bare_cmd = format!("ta-messaging-{}", provider);
249 if which_on_path(&bare_cmd) {
250 tracing::info!(
251 provider = %provider,
252 command = %bare_cmd,
253 "Found messaging plugin as bare executable on PATH"
254 );
255 return Some(DiscoveredMessagingPlugin {
256 manifest: MessagingPluginManifest {
257 name: provider.to_string(),
258 version: "unknown".to_string(),
259 plugin_type: "messaging".to_string(),
260 command: bare_cmd,
261 args: vec![],
262 capabilities: vec![
263 "fetch".to_string(),
264 "create_draft".to_string(),
265 "draft_status".to_string(),
266 "health".to_string(),
267 ],
268 description: None,
269 timeout_secs: 60,
270 protocol_version: MESSAGING_PROTOCOL_VERSION,
271 },
272 plugin_dir: None,
273 source: MessagingPluginSource::Path,
274 });
275 }
276
277 None
278}
279
280#[derive(Debug)]
289pub struct ExternalMessagingAdapter {
290 command: String,
292 args: Vec<String>,
294 provider: String,
296 timeout: Duration,
298}
299
300impl ExternalMessagingAdapter {
301 pub fn new(manifest: &MessagingPluginManifest) -> Self {
303 Self {
304 command: manifest.command.clone(),
305 args: manifest.args.clone(),
306 provider: manifest.name.clone(),
307 timeout: Duration::from_secs(manifest.timeout_secs),
308 }
309 }
310
311 pub fn provider(&self) -> &str {
313 &self.provider
314 }
315
316 pub fn fetch(
320 &self,
321 since_iso8601: &str,
322 account: Option<&str>,
323 limit: Option<u32>,
324 ) -> Result<Vec<FetchedMessage>, MessagingPluginError> {
325 let req = MessagingPluginRequest::Fetch(FetchParams {
326 since: since_iso8601.to_string(),
327 account: account.map(str::to_string),
328 limit,
329 });
330 let resp = self.call_plugin(&req, "fetch")?;
331 Ok(resp.messages.unwrap_or_default())
332 }
333
334 pub fn create_draft(&self, draft: DraftEnvelope) -> Result<String, MessagingPluginError> {
341 let req = MessagingPluginRequest::CreateDraft(CreateDraftParams { draft });
342 let resp = self.call_plugin(&req, "create_draft")?;
343 resp.draft_id
344 .ok_or_else(|| MessagingPluginError::InvalidResponse {
345 name: self.provider.clone(),
346 op: "create_draft".to_string(),
347 reason: "response missing draft_id".to_string(),
348 })
349 }
350
351 pub fn draft_status(&self, draft_id: &str) -> Result<DraftState, MessagingPluginError> {
353 let req = MessagingPluginRequest::DraftStatus(DraftStatusParams {
354 draft_id: draft_id.to_string(),
355 });
356 let resp = self.call_plugin(&req, "draft_status")?;
357 Ok(resp.state.unwrap_or(DraftState::Unknown))
358 }
359
360 pub fn health(&self) -> Result<(String, String), MessagingPluginError> {
364 let req = MessagingPluginRequest::Health(HealthParams {});
365 let resp = self.call_plugin(&req, "health")?;
366 let address = resp.address.unwrap_or_else(|| "<unknown>".to_string());
367 let provider = resp.provider.unwrap_or_else(|| self.provider.clone());
368 Ok((address, provider))
369 }
370
371 fn call_plugin(
376 &self,
377 req: &MessagingPluginRequest,
378 op: &str,
379 ) -> Result<MessagingPluginResponse, MessagingPluginError> {
380 let req_json = serde_json::to_string(req)?;
381
382 let mut parts = self.command.split_whitespace();
383 let program = parts
384 .next()
385 .ok_or_else(|| MessagingPluginError::SpawnFailed {
386 command: self.command.clone(),
387 reason: "command string is empty".to_string(),
388 })?;
389
390 let mut cmd = Command::new(program);
391 for arg in parts {
392 cmd.arg(arg);
393 }
394 for arg in &self.args {
395 cmd.arg(arg);
396 }
397 cmd.stdin(Stdio::piped())
398 .stdout(Stdio::piped())
399 .stderr(Stdio::piped());
400
401 let mut child = cmd.spawn().map_err(|e| MessagingPluginError::SpawnFailed {
402 command: self.command.clone(),
403 reason: e.to_string(),
404 })?;
405
406 if let Some(mut stdin) = child.stdin.take() {
408 stdin
409 .write_all(req_json.as_bytes())
410 .and_then(|_| stdin.write_all(b"\n"))
411 .map_err(|e| {
412 MessagingPluginError::Io(std::io::Error::new(
413 e.kind(),
414 format!("failed to write to plugin stdin: {}", e),
415 ))
416 })?;
417 }
418
419 let timeout_ms = self.timeout.as_millis() as u64;
421 let output =
422 wait_with_timeout(child, timeout_ms).map_err(|_| MessagingPluginError::Timeout {
423 name: self.provider.clone(),
424 op: op.to_string(),
425 timeout_secs: self.timeout.as_secs(),
426 })?;
427
428 if !output.status.success() {
429 let stderr = String::from_utf8_lossy(&output.stderr);
430 return Err(MessagingPluginError::OpFailed {
431 name: self.provider.clone(),
432 op: op.to_string(),
433 reason: format!(
434 "plugin exited with status {}. stderr: {}",
435 output.status,
436 stderr.trim()
437 ),
438 });
439 }
440
441 let stdout = String::from_utf8_lossy(&output.stdout);
442 let first_line = stdout.lines().next().unwrap_or("").trim();
443
444 if first_line.is_empty() {
445 return Err(MessagingPluginError::InvalidResponse {
446 name: self.provider.clone(),
447 op: op.to_string(),
448 reason: "plugin produced no output (expected one JSON line)".to_string(),
449 });
450 }
451
452 let resp: MessagingPluginResponse = serde_json::from_str(first_line).map_err(|e| {
453 MessagingPluginError::InvalidResponse {
454 name: self.provider.clone(),
455 op: op.to_string(),
456 reason: format!(
457 "invalid JSON: {}. Got: '{}'",
458 e,
459 if first_line.len() > 200 {
460 &first_line[..200]
461 } else {
462 first_line
463 }
464 ),
465 }
466 })?;
467
468 if !resp.ok {
469 return Err(MessagingPluginError::OpFailed {
470 name: self.provider.clone(),
471 op: op.to_string(),
472 reason: resp
473 .error
474 .unwrap_or_else(|| "plugin returned ok=false".to_string()),
475 });
476 }
477
478 Ok(resp)
479 }
480}
481
482fn which_on_path(name: &str) -> bool {
488 std::env::var_os("PATH")
489 .map(|path_var| std::env::split_paths(&path_var).any(|dir| dir.join(name).is_file()))
490 .unwrap_or(false)
491}
492
493fn user_config_dir() -> Option<PathBuf> {
495 if let Ok(xdg) = std::env::var("XDG_CONFIG_HOME") {
496 return Some(PathBuf::from(xdg));
497 }
498 std::env::var("HOME")
499 .ok()
500 .map(|home| PathBuf::from(home).join(".config"))
501}
502
503fn wait_with_timeout(
505 child: std::process::Child,
506 timeout_ms: u64,
507) -> std::result::Result<std::process::Output, String> {
508 use std::sync::mpsc;
509
510 let child_id = child.id();
511 let (tx, rx) = mpsc::channel::<()>();
512
513 let watchdog =
514 std::thread::spawn(
515 move || match rx.recv_timeout(Duration::from_millis(timeout_ms)) {
516 Ok(()) => {}
517 Err(_) => {
518 #[cfg(unix)]
519 unsafe {
520 libc::kill(child_id as libc::pid_t, libc::SIGKILL);
521 }
522 #[cfg(not(unix))]
523 let _ = child_id;
524 }
525 },
526 );
527
528 let output = child
529 .wait_with_output()
530 .map_err(|e| format!("wait_with_output failed: {}", e))?;
531
532 let _ = tx.send(());
533 let _ = watchdog.join();
534
535 Ok(output)
536}
537
538#[cfg(test)]
543mod tests {
544 use super::*;
545 use std::path::Path;
546
547 fn write_manifest(dir: &Path, content: &str) {
548 std::fs::write(dir.join("plugin.toml"), content).unwrap();
549 }
550
551 #[test]
552 fn discover_messaging_plugins_finds_manifests() {
553 let root = tempfile::tempdir().unwrap();
554 let msg_dir = root.path().join(".ta").join("plugins").join("messaging");
555
556 let gmail_dir = msg_dir.join("gmail");
557 std::fs::create_dir_all(&gmail_dir).unwrap();
558 write_manifest(
559 &gmail_dir,
560 r#"
561name = "gmail"
562version = "0.1.0"
563type = "messaging"
564command = "ta-messaging-gmail"
565capabilities = ["fetch", "create_draft", "draft_status", "health"]
566description = "Gmail messaging adapter"
567"#,
568 );
569
570 let plugins = discover_messaging_plugins(root.path());
571 assert_eq!(plugins.len(), 1);
572 assert_eq!(plugins[0].manifest.name, "gmail");
573 assert_eq!(plugins[0].source, MessagingPluginSource::ProjectLocal);
574 }
575
576 #[test]
577 fn discover_messaging_plugins_skips_invalid_manifest() {
578 let root = tempfile::tempdir().unwrap();
579 let msg_dir = root.path().join(".ta").join("plugins").join("messaging");
580
581 let good_dir = msg_dir.join("gmail");
583 std::fs::create_dir_all(&good_dir).unwrap();
584 write_manifest(
585 &good_dir,
586 r#"name = "gmail"
587type = "messaging"
588command = "ta-messaging-gmail"
589"#,
590 );
591
592 let bad_dir = msg_dir.join("bad");
594 std::fs::create_dir_all(&bad_dir).unwrap();
595 std::fs::write(bad_dir.join("plugin.toml"), "{{not valid toml}}").unwrap();
596
597 let plugins = discover_messaging_plugins(root.path());
598 assert_eq!(plugins.len(), 1);
599 assert_eq!(plugins[0].manifest.name, "gmail");
600 }
601
602 #[test]
603 fn discover_messaging_plugins_empty_dir_returns_empty() {
604 let root = tempfile::tempdir().unwrap();
605 let plugins = discover_messaging_plugins(root.path());
606 assert!(plugins.is_empty());
607 }
608
609 #[test]
610 fn find_messaging_plugin_project_local() {
611 let root = tempfile::tempdir().unwrap();
612 let msg_dir = root.path().join(".ta").join("plugins").join("messaging");
613
614 let imap_dir = msg_dir.join("imap");
615 std::fs::create_dir_all(&imap_dir).unwrap();
616 write_manifest(
617 &imap_dir,
618 r#"name = "imap"
619type = "messaging"
620command = "ta-messaging-imap"
621"#,
622 );
623
624 let found = find_messaging_plugin("imap", root.path());
625 assert!(found.is_some());
626 assert_eq!(found.unwrap().manifest.name, "imap");
627 }
628
629 #[test]
630 fn find_messaging_plugin_missing_returns_none() {
631 let root = tempfile::tempdir().unwrap();
632 let found = find_messaging_plugin("nonexistent-provider", root.path());
635 assert!(found.is_none());
636 }
637
638 #[test]
639 fn messaging_plugin_source_display() {
640 assert_eq!(MessagingPluginSource::UserGlobal.to_string(), "global");
641 assert_eq!(MessagingPluginSource::ProjectLocal.to_string(), "project");
642 assert_eq!(MessagingPluginSource::Path.to_string(), "PATH");
643 }
644
645 #[cfg(unix)]
652 fn shared_mock_plugin_path() -> &'static std::path::Path {
653 use std::io::Write;
654 use std::os::unix::fs::PermissionsExt;
655 use std::sync::OnceLock;
656
657 static MOCK_PATH: OnceLock<std::path::PathBuf> = OnceLock::new();
658 MOCK_PATH.get_or_init(|| {
659 let pid = std::process::id();
660 let name = format!("ta-msg-mock-shared-{}", pid);
661
662 #[cfg(target_os = "linux")]
666 let path = {
667 let shm = std::path::Path::new("/dev/shm");
668 if shm.exists() {
669 shm.join(&name)
670 } else {
671 std::path::PathBuf::from("/tmp").join(&name)
672 }
673 };
674 #[cfg(not(target_os = "linux"))]
675 let path = std::env::temp_dir().join(&name);
676
677 let mut f = std::fs::File::create(&path).unwrap();
678 f.write_all(
680 br#"#!/bin/sh
681read -r line
682case "$line" in
683 *create_draft*) echo '{"ok":true,"draft_id":"mock-draft-abc123"}' ;;
684 *fetch*) echo '{"ok":true,"messages":[]}' ;;
685 *) echo '{"ok":true,"address":"me@example.com","provider":"mock"}' ;;
686esac
687"#,
688 )
689 .unwrap();
690 f.sync_all().unwrap();
691 drop(f);
692
693 let mut perms = std::fs::metadata(&path).unwrap().permissions();
694 perms.set_mode(0o755);
695 std::fs::set_permissions(&path, perms).unwrap();
696 let _ = std::fs::metadata(&path).unwrap();
698 path
699 })
700 }
701
702 #[cfg(unix)]
703 #[test]
704 fn external_adapter_calls_mock_plugin() {
705 let plugin_path = shared_mock_plugin_path();
706 let manifest = MessagingPluginManifest {
707 name: "mock".to_string(),
708 version: "0.1.0".to_string(),
709 plugin_type: "messaging".to_string(),
710 command: plugin_path.display().to_string(),
711 args: vec![],
712 capabilities: vec!["health".to_string()],
713 description: None,
714 timeout_secs: 30,
715 protocol_version: MESSAGING_PROTOCOL_VERSION,
716 };
717
718 let adapter = ExternalMessagingAdapter::new(&manifest);
719 let (address, provider) = adapter.health().unwrap();
720 assert_eq!(address, "me@example.com");
721 assert_eq!(provider, "mock");
722 }
723
724 #[cfg(unix)]
725 #[test]
726 fn external_adapter_create_draft_returns_id() {
727 let plugin_path = shared_mock_plugin_path();
728 let manifest = MessagingPluginManifest {
729 name: "mock".to_string(),
730 version: "0.1.0".to_string(),
731 plugin_type: "messaging".to_string(),
732 command: plugin_path.display().to_string(),
733 args: vec![],
734 capabilities: vec!["create_draft".to_string()],
735 description: None,
736 timeout_secs: 30,
737 protocol_version: MESSAGING_PROTOCOL_VERSION,
738 };
739
740 let adapter = ExternalMessagingAdapter::new(&manifest);
741 let draft_id = adapter
742 .create_draft(DraftEnvelope {
743 to: "bob@example.com".to_string(),
744 subject: "Hello".to_string(),
745 body_html: "<p>Hi!</p>".to_string(),
746 in_reply_to: None,
747 thread_id: None,
748 body_text: None,
749 })
750 .unwrap();
751 assert_eq!(draft_id, "mock-draft-abc123");
752 }
753}