bestool_psql/
lib.rs

1use std::{
2	collections::VecDeque,
3	io::Write,
4	path::PathBuf,
5	process::Command,
6	sync::{Arc, Mutex},
7	thread,
8	time::Duration,
9};
10
11use completer::SqlCompleter;
12use miette::{miette, IntoDiagnostic, Result};
13use portable_pty::{CommandBuilder, NativePtySystem, PtySize, PtySystem};
14use psql_writer::PsqlWriter;
15use rustyline::{
16	history::{History as _, SearchDirection},
17	Config, Editor,
18};
19use schema_cache::SchemaCacheManager;
20use tempfile::NamedTempFile;
21use thiserror::Error;
22use tracing::{debug, info, trace, warn};
23
24pub use find::find_postgres_bin;
25pub use ots::prompt_for_ots;
26
27mod completer;
28mod find;
29pub mod highlighter;
30pub mod history;
31mod ots;
32mod prompt;
33mod psql_writer;
34mod reader;
35mod schema_cache;
36mod terminal;
37
38/// Set the console codepage on Windows
39///
40/// This is useful for ensuring proper UTF-8 display in Windows console.
41/// On non-Windows platforms, this is a no-op.
42#[cfg(windows)]
43pub fn set_console_codepage(codepage: u32) {
44	unsafe {
45		use windows_sys::Win32::System::Console::{SetConsoleCP, SetConsoleOutputCP};
46		SetConsoleCP(codepage);
47		SetConsoleOutputCP(codepage);
48	}
49}
50
51/// Set the console codepage on Windows (no-op on other platforms)
52#[cfg(not(windows))]
53pub fn set_console_codepage(_codepage: u32) {
54	// No-op on non-Windows platforms
55}
56
57#[derive(Debug, Error)]
58pub enum PsqlError {
59	#[error("psql process terminated unexpectedly")]
60	ProcessTerminated,
61	#[error("failed to read from psql")]
62	ReadError,
63	#[error("failed to write to psql")]
64	WriteError,
65}
66
67/// Configuration for the psql wrapper
68#[derive(Debug, Clone)]
69pub struct PsqlConfig {
70	/// Program executable (typically psql)
71	pub program: String,
72
73	/// Whether to enable write mode
74	pub write: bool,
75
76	/// Arguments to pass to psql
77	pub args: Vec<String>,
78
79	/// Existing psqlrc contents
80	pub psqlrc: String,
81
82	/// Path to the history database
83	pub history_path: PathBuf,
84
85	/// Database user for history tracking
86	pub user: Option<String>,
87
88	/// OTS (Over The Shoulder) value for write mode sessions
89	pub ots: Option<String>,
90
91	/// Whether to launch psql directly without rustyline wrapper (read-only mode only)
92	pub passthrough: bool,
93
94	/// Whether to disable schema completion
95	pub disable_schema_completion: bool,
96
97	/// Syntax highlighting theme
98	pub theme: highlighter::Theme,
99}
100
101impl PsqlConfig {
102	fn psqlrc(&self, boundary: Option<&str>, disable_pager: bool) -> Result<NamedTempFile> {
103		let prompts = if let Some(boundary) = boundary {
104			format!(
105				"\\set PROMPT1 '<<<{boundary}|||1|||%/|||%n|||%#|||%R|||%x>>>'\n\
106				\\set PROMPT2 '<<<{boundary}|||2|||%/|||%n|||%#|||%R|||%x>>>'\n\
107				\\set PROMPT3 '<<<{boundary}|||3|||%/|||%n|||%#|||%R|||%x>>>'\n"
108			)
109		} else {
110			String::new()
111		};
112
113		let pager_setting = if disable_pager {
114			"\\pset pager off\n"
115		} else {
116			""
117		};
118
119		let mut rc = tempfile::Builder::new()
120			.prefix("bestool-psql-")
121			.suffix(".psqlrc")
122			.tempfile()
123			.into_diagnostic()?;
124
125		write!(
126			rc.as_file_mut(),
127			"\\encoding UTF8\n\
128			\\timing\n\
129			{pager_setting}\
130			{existing}\n\
131			{ro}\n\
132			{prompts}",
133			existing = self.psqlrc,
134			ro = if self.write {
135				""
136			} else {
137				"SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;"
138			},
139		)
140		.into_diagnostic()?;
141
142		Ok(rc)
143	}
144
145	fn pty_command(self, boundary: Option<&str>) -> Result<(CommandBuilder, NamedTempFile)> {
146		let mut cmd = CommandBuilder::new(crate::find_postgres_bin(&self.program)?);
147
148		if self.write {
149			cmd.arg("--set=AUTOCOMMIT=OFF");
150		}
151
152		// Disable pager on Windows as it doesn't work properly with PTY
153		let rc = self.psqlrc(boundary, cfg!(windows))?;
154		cmd.env("PSQLRC", rc.path());
155
156		if cfg!(windows) {
157			cmd.env("PAGER", "cat");
158		}
159		// On Unix, allow pager - we'll handle stdin forwarding when not at prompt
160
161		for arg in &self.args {
162			cmd.arg(arg);
163		}
164
165		Ok((cmd, rc))
166	}
167
168	fn std_command(
169		self,
170		boundary: Option<&str>,
171		disable_pager: bool,
172	) -> Result<(Command, NamedTempFile)> {
173		let mut cmd = Command::new(crate::find_postgres_bin(&self.program)?);
174
175		if self.write {
176			cmd.arg("--set=AUTOCOMMIT=OFF");
177		}
178
179		let rc = self.psqlrc(boundary, disable_pager)?;
180		cmd.env("PSQLRC", rc.path());
181		if disable_pager {
182			cmd.env("PAGER", "cat");
183		}
184
185		for arg in &self.args {
186			cmd.arg(arg);
187		}
188
189		Ok((cmd, rc))
190	}
191}
192
193/// Set terminal to raw mode for pager interaction
194#[cfg(unix)]
195struct RawMode {
196	term_fd: i32,
197	original_termios: libc::termios,
198	stdin_fd: i32,
199	original_flags: i32,
200}
201
202#[cfg(unix)]
203impl RawMode {
204	fn enable() -> Option<Self> {
205		use std::os::unix::io::AsRawFd;
206
207		let stdin_fd = std::io::stdin().as_raw_fd();
208
209		// Get the controlling terminal
210		let tty_fd = unsafe { libc::open(c"/dev/tty".as_ptr(), libc::O_RDWR) };
211		let term_fd = if tty_fd >= 0 {
212			tty_fd
213		} else {
214			libc::STDOUT_FILENO
215		};
216
217		// Save original terminal settings
218		let mut original_termios: libc::termios = unsafe { std::mem::zeroed() };
219		if unsafe { libc::tcgetattr(term_fd, &mut original_termios) } != 0 {
220			if tty_fd >= 0 {
221				unsafe { libc::close(tty_fd) };
222			}
223			return None;
224		}
225
226		// Save original stdin flags
227		let original_flags = unsafe { libc::fcntl(stdin_fd, libc::F_GETFL) };
228		if original_flags < 0 {
229			if tty_fd >= 0 {
230				unsafe { libc::close(tty_fd) };
231			}
232			return None;
233		}
234
235		// Set raw mode for immediate character input without echo
236		let mut raw_termios = original_termios;
237		unsafe {
238			libc::cfmakeraw(&mut raw_termios);
239			// Explicitly disable echo to prevent doubled input
240			raw_termios.c_lflag &= !libc::ECHO;
241			raw_termios.c_lflag &= !libc::ECHONL;
242			libc::tcsetattr(term_fd, libc::TCSANOW, &raw_termios);
243
244			// Set stdin non-blocking mode
245			libc::fcntl(stdin_fd, libc::F_SETFL, original_flags | libc::O_NONBLOCK);
246		}
247
248		Some(RawMode {
249			term_fd,
250			original_termios,
251			stdin_fd,
252			original_flags,
253		})
254	}
255}
256
257#[cfg(unix)]
258impl Drop for RawMode {
259	fn drop(&mut self) {
260		// Restore original terminal settings
261		unsafe {
262			libc::tcsetattr(self.term_fd, libc::TCSANOW, &self.original_termios);
263			libc::fcntl(self.stdin_fd, libc::F_SETFL, self.original_flags);
264			if self.term_fd != libc::STDOUT_FILENO {
265				libc::close(self.term_fd);
266			}
267		}
268	}
269}
270
271/// Forward stdin to PTY in raw mode for pager interaction
272#[cfg(unix)]
273fn forward_stdin_to_pty(psql_writer: &PsqlWriter) {
274	use std::io::Read;
275
276	let stdin_handle = std::io::stdin();
277	let mut stdin_lock = stdin_handle.lock();
278
279	// Read and forward input
280	let mut buf = [0u8; 1024];
281	match stdin_lock.read(&mut buf) {
282		Ok(n) if n > 0 => {
283			if std::env::var("DEBUG_PTY").is_ok() {
284				use std::io::Write;
285				let data = String::from_utf8_lossy(&buf[..n]);
286				eprintln!("\x1b[33m[FWD]\x1b[0m forwarding {} bytes: {:?}", n, data);
287				std::io::stderr().flush().ok();
288			}
289			if let Err(e) = psql_writer.write_bytes(&buf[..n]) {
290				warn!("failed to forward stdin to pty: {}", e);
291			}
292		}
293		_ => {}
294	}
295}
296
297#[cfg(windows)]
298fn forward_stdin_to_pty(psql_writer: &PsqlWriter) {
299	use windows_sys::Win32::System::Console::{
300		GetStdHandle, PeekConsoleInputW, ReadConsoleInputW, INPUT_RECORD, STD_INPUT_HANDLE,
301	};
302
303	unsafe {
304		let stdin_handle = GetStdHandle(STD_INPUT_HANDLE);
305		if !stdin_handle.is_null() && stdin_handle as i32 != -1 {
306			let mut num_events: u32 = 0;
307			let mut buffer: [INPUT_RECORD; 1] = std::mem::zeroed();
308
309			// Peek to see if there are any console input events available
310			if PeekConsoleInputW(stdin_handle, buffer.as_mut_ptr(), 1, &mut num_events) != 0
311				&& num_events > 0
312			{
313				// Read the input events
314				let mut num_read: u32 = 0;
315				if ReadConsoleInputW(stdin_handle, buffer.as_mut_ptr(), 1, &mut num_read) != 0
316					&& num_read > 0
317				{
318					// Convert INPUT_RECORD to bytes if it's a key event
319					let record = &buffer[0];
320					// EventType == 1 means KEY_EVENT
321					if record.EventType == 1 {
322						let key_event = record.Event.KeyEvent;
323						// Only process key down events
324						if key_event.bKeyDown != 0 {
325							let ch = key_event.uChar.UnicodeChar;
326							if ch != 0 {
327								// Convert UTF-16 char to bytes
328								let mut utf8_buf = [0u8; 4];
329								if let Some(c) = char::from_u32(ch as u32) {
330									let utf8_str = c.encode_utf8(&mut utf8_buf);
331									if std::env::var("DEBUG_PTY").is_ok() {
332										use std::io::Write;
333										eprint!(
334											"\x1b[33m[FWD]\x1b[0m forwarding char: {:?}\n",
335											utf8_str
336										);
337										std::io::stderr().flush().ok();
338									}
339									if let Err(e) = psql_writer.write_bytes(utf8_str.as_bytes()) {
340										warn!("failed to forward stdin to pty: {}", e);
341									}
342								}
343							}
344						}
345					}
346				}
347			}
348		}
349	}
350}
351
352pub fn run(config: PsqlConfig) -> Result<i32> {
353	// Handle passthrough mode (read-only only)
354	if config.passthrough {
355		if config.write {
356			return Err(miette!(
357				"passthrough mode is only available in read-only mode"
358			));
359		}
360		info!("launching psql in passthrough mode");
361		return run_passthrough(config);
362	}
363
364	// Warn if running in cmd.exe on Windows (output is broken there)
365	#[cfg(windows)]
366	if std::env::var("PSModulePath").is_err() {
367		use tracing::warn;
368		warn!(
369			"Running in cmd.exe detected. Output may be broken. Consider using PowerShell instead."
370		);
371	}
372
373	// Extract theme before config is moved
374	let theme = config.theme;
375
376	let boundary = prompt::generate_boundary();
377	debug!(boundary = %boundary, "generated prompt boundary marker");
378
379	let pty_system = NativePtySystem::default();
380
381	let (cols, rows) = terminal::get_terminal_size();
382
383	let pty_pair = pty_system
384		.openpty(PtySize {
385			rows,
386			cols,
387			pixel_width: 0,
388			pixel_height: 0,
389		})
390		.map_err(|e| miette!("failed to create pty: {}", e))?;
391
392	let pty_master = Arc::new(Mutex::new(pty_pair.master));
393
394	terminal::spawn_resize_handler(pty_master.clone());
395
396	let history_path = config.history_path.clone();
397	let db_user = config.user.clone();
398	let boundary_clone = boundary.clone();
399
400	// Track write mode and OTS as mutable shared state for \W command
401	let write_mode = Arc::new(Mutex::new(config.write));
402	let ots = Arc::new(Mutex::new(config.ots.clone()));
403	let write_mode_clone = write_mode.clone();
404	let ots_clone = ots.clone();
405
406	let disable_schema_completion = config.disable_schema_completion;
407
408	let (cmd, _rc_guard) = config.pty_command(Some(&boundary))?;
409	let mut child = pty_pair
410		.slave
411		.spawn_command(cmd)
412		.map_err(|e| miette!("failed to spawn psql: {}", e))?;
413
414	drop(pty_pair.slave);
415
416	let reader = {
417		let master = pty_master.lock().unwrap();
418		master
419			.try_clone_reader()
420			.map_err(|e| miette!("failed to clone pty reader: {}", e))?
421	};
422
423	let writer = Arc::new(Mutex::new({
424		let master = pty_master.lock().unwrap();
425		master
426			.take_writer()
427			.map_err(|e| miette!("failed to get pty writer: {}", e))?
428	}));
429
430	// Flag to signal termination
431	let running = Arc::new(Mutex::new(true));
432	let running_clone = running.clone();
433
434	// Buffer to accumulate output and track current prompt (ring buffer with max 1024 bytes)
435	let output_buffer = Arc::new(Mutex::new(VecDeque::with_capacity(1024)));
436	let output_buffer_clone = output_buffer.clone();
437
438	let psql_writer = PsqlWriter::new(writer.clone(), output_buffer.clone());
439
440	let current_prompt = Arc::new(Mutex::new(String::new()));
441	let current_prompt_clone = current_prompt.clone();
442
443	// Track the parsed prompt info for transaction state checking
444	let current_prompt_info = Arc::new(Mutex::new(None));
445	let current_prompt_info_clone = current_prompt_info.clone();
446
447	// Track the last input sent to filter out echo
448	let last_input = Arc::new(Mutex::new(String::new()));
449
450	// Control whether output is printed to stdout
451	let print_enabled = Arc::new(Mutex::new(true));
452	let print_enabled_clone = print_enabled.clone();
453
454	#[cfg_attr(
455		windows,
456		expect(
457			unused_variables,
458			reason = "different quit behaviour, see below in the main loop"
459		)
460	)]
461	let reader_thread = reader::spawn_reader_thread(reader::ReaderThreadParams {
462		reader,
463		boundary: boundary_clone,
464		output_buffer: output_buffer_clone,
465		current_prompt: current_prompt_clone,
466		current_prompt_info: current_prompt_info_clone,
467		last_input: last_input.clone(),
468		running: running_clone,
469		print_enabled: print_enabled_clone,
470		writer: writer.clone(),
471	});
472
473	let history = history::History::setup(
474		history_path.clone(),
475		db_user,
476		*write_mode.lock().unwrap(),
477		ots.lock().unwrap().clone(),
478	);
479
480	let schema_cache_manager = if !disable_schema_completion {
481		debug!("initializing schema cache");
482		let manager = SchemaCacheManager::new(
483			writer.clone(),
484			print_enabled.clone(),
485			write_mode.clone(),
486			output_buffer.clone(),
487			boundary.clone(),
488		);
489
490		if let Err(e) = manager.refresh() {
491			warn!("failed to populate schema cache: {}", e);
492		}
493
494		Some(manager)
495	} else {
496		debug!("schema completion disabled by config");
497		None
498	};
499
500	let mut completer =
501		SqlCompleter::with_pty_and_theme(writer.clone(), output_buffer.clone(), theme);
502	if let Some(ref cache_manager) = schema_cache_manager {
503		completer = completer.with_schema_cache(cache_manager.cache_arc());
504	}
505
506	let mut rl: Editor<SqlCompleter, history::History> = Editor::with_history(
507		Config::builder()
508			.auto_add_history(false)
509			.history_ignore_dups(false)
510			.unwrap()
511			.build(),
512		history,
513	)
514	.into_diagnostic()?;
515
516	rl.set_helper(Some(completer));
517
518	let mut last_reload = std::time::Instant::now();
519
520	debug!("entering main event loop");
521
522	#[cfg(unix)]
523	let mut raw_mode: Option<RawMode> = None;
524
525	loop {
526		if last_reload.elapsed() >= Duration::from_secs(60) {
527			debug!("reloading history timestamps");
528			if let Err(e) = rl.history_mut().reload_timestamps() {
529				warn!("failed to reload history timestamps: {}", e);
530			}
531			last_reload = std::time::Instant::now();
532		}
533		match child.try_wait().into_diagnostic()? {
534			Some(status) => {
535				// Process has exited
536				debug!(exit_code = status.exit_code(), "psql process exited");
537				// On Windows, don't wait for reader thread as it may be blocked on PTY read
538				#[cfg(windows)]
539				{
540					*running.lock().unwrap() = false;
541					thread::sleep(Duration::from_millis(100));
542				}
543				#[cfg(not(windows))]
544				{
545					reader_thread.join().ok();
546				}
547				return Ok(status.exit_code() as i32);
548			}
549			None => {
550				// Process still running
551			}
552		}
553
554		// Check if reader thread is still running
555		if !*running.lock().unwrap() {
556			// Reader has stopped, process might have exited
557			thread::sleep(Duration::from_millis(50));
558			if let Some(status) = child.try_wait().into_diagnostic()? {
559				// Reader thread signaled it stopped, process may have exited
560				#[cfg(windows)]
561				{
562					thread::sleep(Duration::from_millis(100));
563				}
564				#[cfg(not(windows))]
565				{
566					reader_thread.join().ok();
567				}
568				return Ok(status.exit_code() as i32);
569			}
570		}
571
572		// Small delay to let output accumulate
573		thread::sleep(Duration::from_millis(50));
574
575		let at_prompt = psql_writer.buffer_contains(&format!("<<<{boundary}|||"));
576		if !at_prompt {
577			// Check if process has exited before forwarding stdin
578			if let Some(status) = child.try_wait().into_diagnostic()? {
579				debug!(
580					exit_code = status.exit_code(),
581					"psql process exited while not at prompt"
582				);
583				// On Windows, don't wait for reader thread as it may be blocked on PTY read
584				#[cfg(windows)]
585				{
586					// Signal the reader to stop
587					*running.lock().unwrap() = false;
588					// Give it a moment to finish, but don't wait indefinitely
589					thread::sleep(Duration::from_millis(100));
590				}
591				#[cfg(not(windows))]
592				{
593					reader_thread.join().ok();
594				}
595				return Ok(status.exit_code() as i32);
596			}
597
598			// Not at a prompt - could be in a pager or query is running
599			// Enable raw mode once and keep it active until we return to prompt
600			#[cfg(unix)]
601			if raw_mode.is_none() {
602				raw_mode = RawMode::enable();
603			}
604
605			// Forward stdin to PTY for pager interaction
606			forward_stdin_to_pty(&psql_writer);
607			thread::sleep(Duration::from_millis(50));
608			continue;
609		}
610
611		// We're at a prompt - disable raw mode if it was enabled
612		#[cfg(unix)]
613		if raw_mode.is_some() {
614			raw_mode = None; // Drop will restore terminal
615		}
616
617		// Use the formatted prompt for readline
618		let prompt_text = current_prompt.lock().unwrap().clone();
619		let readline_prompt = if prompt_text.is_empty() {
620			"psql> ".to_string()
621		} else {
622			prompt_text
623		};
624
625		match rl.readline(&readline_prompt) {
626			Ok(line) => {
627				trace!("received input line");
628				let trimmed = line.trim();
629				if trimmed == "\\e" || trimmed.starts_with("\\e ") {
630					debug!("editor command intercepted");
631
632					// Get the initial content - either from argument or from history
633					let initial_content = if trimmed == "\\e" {
634						// Get the last command from history
635						let hist_len = rl.history().len();
636						if hist_len > 0 {
637							match rl.history().get(hist_len - 1, SearchDirection::Forward) {
638								Ok(Some(result)) => result.entry.to_string(),
639								_ => String::new(),
640							}
641						} else {
642							String::new()
643						}
644					} else {
645						// User provided content after \e
646						trimmed
647							.strip_prefix("\\e ")
648							.unwrap_or("")
649							.trim()
650							.to_string()
651					};
652
653					// Open editor with the content
654					match edit::edit(&initial_content) {
655						Ok(edited_content) => {
656							let edited_trimmed = edited_content.trim();
657
658							// Only send if content is not empty
659							if !edited_trimmed.is_empty() {
660								info!("sending edited content to psql");
661
662								// Add to history
663								if let Err(e) = rl.history_mut().add(&edited_content) {
664									warn!("failed to add history entry: {}", e);
665								} else {
666									debug!("wrote history entry before sending to psql");
667								}
668
669								// Store the input so we can filter out the echo
670								*last_input.lock().unwrap() = format!("{}\n", edited_content);
671
672								// Send to psql
673								if let Err(e) = psql_writer.write_line(&edited_content) {
674									warn!("failed to write to psql: {}", e);
675									return Err(PsqlError::WriteError).into_diagnostic();
676								}
677							} else {
678								debug!("editor returned empty content, skipping");
679							}
680						}
681						Err(e) => {
682							warn!("editor failed: {}", e);
683							eprintln!("Editor failed: {}", e);
684						}
685					}
686					continue;
687				}
688
689				if trimmed == "\\refresh" {
690					let prompt_info = current_prompt_info.lock().unwrap().clone();
691					if let Some(ref info) = prompt_info {
692						if info.in_transaction() {
693							eprintln!("Cannot refresh schema cache while in a transaction. Please COMMIT or ROLLBACK first.");
694							continue;
695						}
696					}
697
698					if let Some(ref cache_manager) = schema_cache_manager {
699						info!("refreshing schema cache...");
700						eprintln!("Refreshing schema cache...");
701						match cache_manager.refresh() {
702							Ok(()) => {
703								eprintln!("Schema cache refreshed successfully");
704							}
705							Err(e) => {
706								warn!("failed to refresh schema cache: {}", e);
707								eprintln!("Failed to refresh schema cache: {}", e);
708							}
709						}
710					} else {
711						eprintln!("Schema cache is not enabled");
712					}
713					continue;
714				}
715
716				if trimmed == "\\W" {
717					let prompt_info = current_prompt_info.lock().unwrap().clone();
718					if let Some(ref info) = prompt_info {
719						if info.in_transaction() && info.transaction == "*" {
720							warn!("Pending transaction! Please COMMIT or ROLLBACK first");
721							continue;
722						}
723					}
724
725					let mut current_write_mode = write_mode_clone.lock().unwrap();
726					let mut current_ots = ots_clone.lock().unwrap();
727
728					if *current_write_mode {
729						*current_write_mode = false;
730						*current_ots = None;
731
732						#[cfg(windows)]
733						let cmd = "SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;\r\n\\set AUTOCOMMIT on\r\nROLLBACK;\r\n";
734						#[cfg(not(windows))]
735						let cmd = "SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;\n\\set AUTOCOMMIT on\nROLLBACK;\n";
736						if let Err(e) = psql_writer.write_str(cmd) {
737							warn!("failed to write to psql: {}", e);
738							continue;
739						}
740
741						thread::sleep(Duration::from_millis(50));
742						info!("Write mode disabled");
743						thread::sleep(Duration::from_millis(5));
744						eprintln!("SESSION IS NOW READ ONLY");
745
746						let db_user = rl.history().db_user.clone();
747						let sys_user = rl.history().sys_user.clone();
748						rl.history_mut().set_context(db_user, sys_user, false, None);
749					} else {
750						drop(current_write_mode);
751						drop(current_ots);
752
753						let db_handle = rl.history().clone_db();
754						match ots::prompt_for_ots_with_db(Some(db_handle), Some(&history_path)) {
755							Ok(new_ots) => {
756								let mut current_write_mode = write_mode_clone.lock().unwrap();
757								let mut current_ots = ots_clone.lock().unwrap();
758
759								*current_write_mode = true;
760								*current_ots = Some(new_ots.clone());
761
762								#[cfg(windows)]
763								let cmd = "SET SESSION CHARACTERISTICS AS TRANSACTION READ WRITE;\r\n\\set AUTOCOMMIT off\r\nROLLBACK;\r\n";
764								#[cfg(not(windows))]
765								let cmd = "SET SESSION CHARACTERISTICS AS TRANSACTION READ WRITE;\n\\set AUTOCOMMIT off\nROLLBACK;\n";
766								if let Err(e) = psql_writer.write_str(cmd) {
767									warn!("failed to write to psql: {}", e);
768									continue;
769								}
770
771								thread::sleep(Duration::from_millis(50));
772								info!("Write mode enabled");
773								thread::sleep(Duration::from_millis(5));
774								eprintln!("AUTOCOMMIT IS OFF -- REMEMBER TO `COMMIT;` YOUR WRITES");
775
776								let db_user = rl.history().db_user.clone();
777								let sys_user = rl.history().sys_user.clone();
778								rl.history_mut().set_context(
779									db_user,
780									sys_user,
781									true,
782									Some(new_ots),
783								);
784							}
785							Err(e) => {
786								eprintln!("Failed to enable write mode: {}", e);
787							}
788						}
789					}
790					continue;
791				}
792
793				if !line.trim().is_empty() {
794					if let Err(e) = rl.history_mut().add(&line) {
795						warn!("failed to add history entry: {}", e);
796					} else {
797						debug!("wrote history entry before sending to psql");
798					}
799				}
800
801				// Store the input so we can filter out the echo
802				*last_input.lock().unwrap() = format!("{}\n", line);
803
804				if let Err(e) = psql_writer.write_line(&line) {
805					warn!("failed to write to psql: {}", e);
806					return Err(PsqlError::WriteError).into_diagnostic();
807				}
808			}
809			Err(rustyline::error::ReadlineError::Interrupted) => {
810				debug!("received Ctrl-C");
811				psql_writer.send_control(3).ok(); // ASCII ETX (Ctrl-C)
812			}
813			Err(rustyline::error::ReadlineError::Eof) => {
814				debug!("received Ctrl-D (EOF)");
815				#[cfg(windows)]
816				{
817					// On Windows, send \q command instead of Ctrl-D as it's more reliable
818					psql_writer.write_line("\\q").ok();
819				}
820				#[cfg(not(windows))]
821				{
822					psql_writer.send_control(4).ok(); // ASCII EOT (Ctrl-D)
823				}
824				break;
825			}
826			Err(err) => {
827				return Err(err).into_diagnostic();
828			}
829		}
830	}
831
832	// On Windows, don't wait for reader thread as it may be blocked on PTY read
833	#[cfg(windows)]
834	{
835		*running.lock().unwrap() = false;
836		thread::sleep(Duration::from_millis(100));
837	}
838	#[cfg(not(windows))]
839	{
840		reader_thread.join().ok();
841	}
842
843	// On Windows, give the process a chance to exit gracefully, but force kill if needed
844	#[cfg(windows)]
845	let status = {
846		use std::time::Duration;
847
848		// Wait up to 2 seconds for graceful exit
849		let mut attempts = 0;
850		loop {
851			if let Some(status) = child.try_wait().into_diagnostic()? {
852				break status;
853			}
854			if attempts >= 20 {
855				// After 2 seconds, force kill with Ctrl-C
856				debug!("process didn't exit gracefully, sending Ctrl-C");
857				psql_writer.send_control(3).ok();
858				thread::sleep(Duration::from_millis(500));
859				if let Some(status) = child.try_wait().into_diagnostic()? {
860					break status;
861				}
862				// If still not dead, wait indefinitely
863				break child.wait().into_diagnostic()?;
864			}
865			thread::sleep(Duration::from_millis(100));
866			attempts += 1;
867		}
868	};
869
870	#[cfg(not(windows))]
871	let status = child.wait().into_diagnostic()?;
872
873	debug!("compacting history database");
874	if let Err(e) = rl.history_mut().compact() {
875		warn!("failed to compact history database: {}", e);
876	}
877
878	debug!(exit_code = status.exit_code(), "exiting");
879	Ok(status.exit_code() as i32)
880}
881
882/// Run psql in passthrough mode (no rustyline wrapper)
883///
884/// Read-only mode is enforced.
885fn run_passthrough(mut config: PsqlConfig) -> Result<i32> {
886	// explicitly cannot do writes without the protections of the wrapper
887	config.write = false;
888
889	let (mut cmd, _guard) = config.std_command(None, false)?;
890	let status = cmd.status().into_diagnostic()?;
891
892	Ok(status.code().unwrap_or(1))
893}