bestool_psql/
lib.rs

1use std::{
2	collections::VecDeque,
3	io::Write,
4	path::PathBuf,
5	process::Command,
6	sync::{Arc, Mutex},
7	thread,
8	time::Duration,
9};
10
11use completer::SqlCompleter;
12use miette::{miette, IntoDiagnostic, Result};
13use portable_pty::{CommandBuilder, NativePtySystem, PtySize, PtySystem};
14use psql_writer::PsqlWriter;
15use rustyline::{
16	history::{History as _, SearchDirection},
17	Config, Editor,
18};
19use schema_cache::SchemaCacheManager;
20use tempfile::NamedTempFile;
21use thiserror::Error;
22use tracing::{debug, info, trace, warn};
23
24pub use find::find_postgres_bin;
25pub use ots::prompt_for_ots;
26
27mod completer;
28pub mod export;
29mod find;
30pub mod highlighter;
31pub mod history;
32mod ots;
33mod prompt;
34mod psql_writer;
35mod reader;
36mod schema_cache;
37mod terminal;
38
39/// Set the console codepage on Windows
40///
41/// This is useful for ensuring proper UTF-8 display in Windows console.
42/// On non-Windows platforms, this is a no-op.
43#[cfg(windows)]
44pub fn set_console_codepage(codepage: u32) {
45	unsafe {
46		use windows_sys::Win32::System::Console::{SetConsoleCP, SetConsoleOutputCP};
47		SetConsoleCP(codepage);
48		SetConsoleOutputCP(codepage);
49	}
50}
51
52/// Set the console codepage on Windows (no-op on other platforms)
53#[cfg(not(windows))]
54pub fn set_console_codepage(_codepage: u32) {
55	// No-op on non-Windows platforms
56}
57
58#[derive(Debug, Error)]
59pub enum PsqlError {
60	#[error("psql process terminated unexpectedly")]
61	ProcessTerminated,
62	#[error("failed to read from psql")]
63	ReadError,
64	#[error("failed to write to psql")]
65	WriteError,
66}
67
68/// Configuration for the psql wrapper
69#[derive(Debug, Clone)]
70pub struct PsqlConfig {
71	/// Program executable (typically psql)
72	pub program: String,
73
74	/// Whether to enable write mode
75	pub write: bool,
76
77	/// Arguments to pass to psql
78	pub args: Vec<String>,
79
80	/// Existing psqlrc contents
81	pub psqlrc: String,
82
83	/// Path to the history database
84	pub history_path: PathBuf,
85
86	/// Database user for history tracking
87	pub user: Option<String>,
88
89	/// OTS (Over The Shoulder) value for write mode sessions
90	pub ots: Option<String>,
91
92	/// Whether to launch psql directly without rustyline wrapper (read-only mode only)
93	pub passthrough: bool,
94
95	/// Whether to disable schema completion
96	pub disable_schema_completion: bool,
97
98	/// Syntax highlighting theme
99	pub theme: highlighter::Theme,
100}
101
102impl PsqlConfig {
103	fn psqlrc(&self, boundary: Option<&str>, disable_pager: bool) -> Result<NamedTempFile> {
104		let prompts = if let Some(boundary) = boundary {
105			format!(
106				"\\set PROMPT1 '<<<{boundary}|||1|||%/|||%n|||%#|||%R|||%x>>>'\n\
107				\\set PROMPT2 '<<<{boundary}|||2|||%/|||%n|||%#|||%R|||%x>>>'\n\
108				\\set PROMPT3 '<<<{boundary}|||3|||%/|||%n|||%#|||%R|||%x>>>'\n"
109			)
110		} else {
111			String::new()
112		};
113
114		let pager_setting = if disable_pager {
115			"\\pset pager off\n"
116		} else {
117			""
118		};
119
120		let mut rc = tempfile::Builder::new()
121			.prefix("bestool-psql-")
122			.suffix(".psqlrc")
123			.tempfile()
124			.into_diagnostic()?;
125
126		write!(
127			rc.as_file_mut(),
128			"\\encoding UTF8\n\
129			\\timing\n\
130			{pager_setting}\
131			{existing}\n\
132			{ro}\n\
133			{prompts}",
134			existing = self.psqlrc,
135			ro = if self.write {
136				""
137			} else {
138				"SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;"
139			},
140		)
141		.into_diagnostic()?;
142
143		Ok(rc)
144	}
145
146	fn pty_command(self, boundary: Option<&str>) -> Result<(CommandBuilder, NamedTempFile)> {
147		let mut cmd = CommandBuilder::new(crate::find_postgres_bin(&self.program)?);
148
149		if self.write {
150			cmd.arg("--set=AUTOCOMMIT=OFF");
151		}
152
153		// Disable pager on Windows as it doesn't work properly with PTY
154		let rc = self.psqlrc(boundary, cfg!(windows))?;
155		cmd.env("PSQLRC", rc.path());
156
157		if cfg!(windows) {
158			cmd.env("PAGER", "cat");
159		}
160		// On Unix, allow pager - we'll handle stdin forwarding when not at prompt
161
162		for arg in &self.args {
163			cmd.arg(arg);
164		}
165
166		Ok((cmd, rc))
167	}
168
169	fn std_command(
170		self,
171		boundary: Option<&str>,
172		disable_pager: bool,
173	) -> Result<(Command, NamedTempFile)> {
174		let mut cmd = Command::new(crate::find_postgres_bin(&self.program)?);
175
176		if self.write {
177			cmd.arg("--set=AUTOCOMMIT=OFF");
178		}
179
180		let rc = self.psqlrc(boundary, disable_pager)?;
181		cmd.env("PSQLRC", rc.path());
182		if disable_pager {
183			cmd.env("PAGER", "cat");
184		}
185
186		for arg in &self.args {
187			cmd.arg(arg);
188		}
189
190		Ok((cmd, rc))
191	}
192}
193
194/// Set terminal to raw mode for pager interaction
195#[cfg(unix)]
196struct RawMode {
197	term_fd: i32,
198	original_termios: libc::termios,
199	stdin_fd: i32,
200	original_flags: i32,
201}
202
203#[cfg(unix)]
204impl RawMode {
205	fn enable() -> Option<Self> {
206		use std::os::unix::io::AsRawFd;
207
208		let stdin_fd = std::io::stdin().as_raw_fd();
209
210		// Get the controlling terminal
211		let tty_fd = unsafe { libc::open(c"/dev/tty".as_ptr(), libc::O_RDWR) };
212		let term_fd = if tty_fd >= 0 {
213			tty_fd
214		} else {
215			libc::STDOUT_FILENO
216		};
217
218		// Save original terminal settings
219		let mut original_termios: libc::termios = unsafe { std::mem::zeroed() };
220		if unsafe { libc::tcgetattr(term_fd, &mut original_termios) } != 0 {
221			if tty_fd >= 0 {
222				unsafe { libc::close(tty_fd) };
223			}
224			return None;
225		}
226
227		// Save original stdin flags
228		let original_flags = unsafe { libc::fcntl(stdin_fd, libc::F_GETFL) };
229		if original_flags < 0 {
230			if tty_fd >= 0 {
231				unsafe { libc::close(tty_fd) };
232			}
233			return None;
234		}
235
236		// Set raw mode for immediate character input without echo
237		let mut raw_termios = original_termios;
238		unsafe {
239			libc::cfmakeraw(&mut raw_termios);
240			// Explicitly disable echo to prevent doubled input
241			raw_termios.c_lflag &= !libc::ECHO;
242			raw_termios.c_lflag &= !libc::ECHONL;
243			libc::tcsetattr(term_fd, libc::TCSANOW, &raw_termios);
244
245			// Set stdin non-blocking mode
246			libc::fcntl(stdin_fd, libc::F_SETFL, original_flags | libc::O_NONBLOCK);
247		}
248
249		Some(RawMode {
250			term_fd,
251			original_termios,
252			stdin_fd,
253			original_flags,
254		})
255	}
256}
257
258#[cfg(unix)]
259impl Drop for RawMode {
260	fn drop(&mut self) {
261		// Restore original terminal settings
262		unsafe {
263			libc::tcsetattr(self.term_fd, libc::TCSANOW, &self.original_termios);
264			libc::fcntl(self.stdin_fd, libc::F_SETFL, self.original_flags);
265			if self.term_fd != libc::STDOUT_FILENO {
266				libc::close(self.term_fd);
267			}
268		}
269	}
270}
271
272/// Forward stdin to PTY in raw mode for pager interaction
273#[cfg(unix)]
274fn forward_stdin_to_pty(psql_writer: &PsqlWriter) {
275	use std::io::Read;
276
277	let stdin_handle = std::io::stdin();
278	let mut stdin_lock = stdin_handle.lock();
279
280	// Read and forward input
281	let mut buf = [0u8; 1024];
282	match stdin_lock.read(&mut buf) {
283		Ok(n) if n > 0 => {
284			if std::env::var("DEBUG_PTY").is_ok() {
285				use std::io::Write;
286				let data = String::from_utf8_lossy(&buf[..n]);
287				eprintln!("\x1b[33m[FWD]\x1b[0m forwarding {} bytes: {:?}", n, data);
288				std::io::stderr().flush().ok();
289			}
290			if let Err(e) = psql_writer.write_bytes(&buf[..n]) {
291				warn!("failed to forward stdin to pty: {}", e);
292			}
293		}
294		_ => {}
295	}
296}
297
298#[cfg(windows)]
299fn forward_stdin_to_pty(psql_writer: &PsqlWriter) {
300	use windows_sys::Win32::System::Console::{
301		GetStdHandle, PeekConsoleInputW, ReadConsoleInputW, INPUT_RECORD, STD_INPUT_HANDLE,
302	};
303
304	unsafe {
305		let stdin_handle = GetStdHandle(STD_INPUT_HANDLE);
306		if !stdin_handle.is_null() && stdin_handle as i32 != -1 {
307			let mut num_events: u32 = 0;
308			let mut buffer: [INPUT_RECORD; 1] = std::mem::zeroed();
309
310			// Peek to see if there are any console input events available
311			if PeekConsoleInputW(stdin_handle, buffer.as_mut_ptr(), 1, &mut num_events) != 0
312				&& num_events > 0
313			{
314				// Read the input events
315				let mut num_read: u32 = 0;
316				if ReadConsoleInputW(stdin_handle, buffer.as_mut_ptr(), 1, &mut num_read) != 0
317					&& num_read > 0
318				{
319					// Convert INPUT_RECORD to bytes if it's a key event
320					let record = &buffer[0];
321					// EventType == 1 means KEY_EVENT
322					if record.EventType == 1 {
323						let key_event = record.Event.KeyEvent;
324						// Only process key down events
325						if key_event.bKeyDown != 0 {
326							let ch = key_event.uChar.UnicodeChar;
327							if ch != 0 {
328								// Convert UTF-16 char to bytes
329								let mut utf8_buf = [0u8; 4];
330								if let Some(c) = char::from_u32(ch as u32) {
331									let utf8_str = c.encode_utf8(&mut utf8_buf);
332									if std::env::var("DEBUG_PTY").is_ok() {
333										use std::io::Write;
334										eprint!(
335											"\x1b[33m[FWD]\x1b[0m forwarding char: {:?}\n",
336											utf8_str
337										);
338										std::io::stderr().flush().ok();
339									}
340									if let Err(e) = psql_writer.write_bytes(utf8_str.as_bytes()) {
341										warn!("failed to forward stdin to pty: {}", e);
342									}
343								}
344							}
345						}
346					}
347				}
348			}
349		}
350	}
351}
352
353pub fn run(config: PsqlConfig) -> Result<i32> {
354	// Handle passthrough mode (read-only only)
355	if config.passthrough {
356		if config.write {
357			return Err(miette!(
358				"passthrough mode is only available in read-only mode"
359			));
360		}
361		info!("launching psql in passthrough mode");
362		return run_passthrough(config);
363	}
364
365	// Warn if running in cmd.exe on Windows (output is broken there)
366	#[cfg(windows)]
367	if std::env::var("PSModulePath").is_err() {
368		use tracing::warn;
369		warn!(
370			"Running in cmd.exe detected. Output may be broken. Consider using PowerShell instead."
371		);
372	}
373
374	// Extract theme before config is moved
375	let theme = config.theme;
376
377	let boundary = prompt::generate_boundary();
378	debug!(boundary = %boundary, "generated prompt boundary marker");
379
380	let pty_system = NativePtySystem::default();
381
382	let (cols, rows) = terminal::get_terminal_size();
383
384	let pty_pair = pty_system
385		.openpty(PtySize {
386			rows,
387			cols,
388			pixel_width: 0,
389			pixel_height: 0,
390		})
391		.map_err(|e| miette!("failed to create pty: {}", e))?;
392
393	let pty_master = Arc::new(Mutex::new(pty_pair.master));
394
395	terminal::spawn_resize_handler(pty_master.clone());
396
397	let history_path = config.history_path.clone();
398	let db_user = config.user.clone();
399	let boundary_clone = boundary.clone();
400
401	// Track write mode and OTS as mutable shared state for \W command
402	let write_mode = Arc::new(Mutex::new(config.write));
403	let ots = Arc::new(Mutex::new(config.ots.clone()));
404	let write_mode_clone = write_mode.clone();
405	let ots_clone = ots.clone();
406
407	let disable_schema_completion = config.disable_schema_completion;
408
409	let (cmd, _rc_guard) = config.pty_command(Some(&boundary))?;
410	let mut child = pty_pair
411		.slave
412		.spawn_command(cmd)
413		.map_err(|e| miette!("failed to spawn psql: {}", e))?;
414
415	drop(pty_pair.slave);
416
417	let reader = {
418		let master = pty_master.lock().unwrap();
419		master
420			.try_clone_reader()
421			.map_err(|e| miette!("failed to clone pty reader: {}", e))?
422	};
423
424	let writer = Arc::new(Mutex::new({
425		let master = pty_master.lock().unwrap();
426		master
427			.take_writer()
428			.map_err(|e| miette!("failed to get pty writer: {}", e))?
429	}));
430
431	// Flag to signal termination
432	let running = Arc::new(Mutex::new(true));
433	let running_clone = running.clone();
434
435	// Buffer to accumulate output and track current prompt (ring buffer with max 1024 bytes)
436	let output_buffer = Arc::new(Mutex::new(VecDeque::with_capacity(1024)));
437	let output_buffer_clone = output_buffer.clone();
438
439	let psql_writer = PsqlWriter::new(writer.clone(), output_buffer.clone());
440
441	let current_prompt = Arc::new(Mutex::new(String::new()));
442	let current_prompt_clone = current_prompt.clone();
443
444	// Track the parsed prompt info for transaction state checking
445	let current_prompt_info = Arc::new(Mutex::new(None));
446	let current_prompt_info_clone = current_prompt_info.clone();
447
448	// Track the last input sent to filter out echo
449	let last_input = Arc::new(Mutex::new(String::new()));
450
451	// Control whether output is printed to stdout
452	let print_enabled = Arc::new(Mutex::new(true));
453	let print_enabled_clone = print_enabled.clone();
454
455	#[cfg_attr(
456		windows,
457		expect(
458			unused_variables,
459			reason = "different quit behaviour, see below in the main loop"
460		)
461	)]
462	let reader_thread = reader::spawn_reader_thread(reader::ReaderThreadParams {
463		reader,
464		boundary: boundary_clone,
465		output_buffer: output_buffer_clone,
466		current_prompt: current_prompt_clone,
467		current_prompt_info: current_prompt_info_clone,
468		last_input: last_input.clone(),
469		running: running_clone,
470		print_enabled: print_enabled_clone,
471		writer: writer.clone(),
472	});
473
474	let history = history::History::setup(
475		history_path.clone(),
476		db_user,
477		*write_mode.lock().unwrap(),
478		ots.lock().unwrap().clone(),
479	);
480
481	let schema_cache_manager = if !disable_schema_completion {
482		debug!("initializing schema cache");
483		let manager = SchemaCacheManager::new(
484			writer.clone(),
485			print_enabled.clone(),
486			write_mode.clone(),
487			output_buffer.clone(),
488			boundary.clone(),
489		);
490
491		if let Err(e) = manager.refresh() {
492			warn!("failed to populate schema cache: {}", e);
493		}
494
495		Some(manager)
496	} else {
497		debug!("schema completion disabled by config");
498		None
499	};
500
501	let mut completer =
502		SqlCompleter::with_pty_and_theme(writer.clone(), output_buffer.clone(), theme);
503	if let Some(ref cache_manager) = schema_cache_manager {
504		completer = completer.with_schema_cache(cache_manager.cache_arc());
505	}
506
507	let mut rl: Editor<SqlCompleter, history::History> = Editor::with_history(
508		Config::builder()
509			.auto_add_history(false)
510			.history_ignore_dups(false)
511			.unwrap()
512			.build(),
513		history,
514	)
515	.into_diagnostic()?;
516
517	rl.set_helper(Some(completer));
518
519	let mut last_reload = std::time::Instant::now();
520
521	debug!("entering main event loop");
522
523	#[cfg(unix)]
524	let mut raw_mode: Option<RawMode> = None;
525
526	loop {
527		if last_reload.elapsed() >= Duration::from_secs(60) {
528			debug!("reloading history timestamps");
529			if let Err(e) = rl.history_mut().reload_timestamps() {
530				warn!("failed to reload history timestamps: {}", e);
531			}
532			last_reload = std::time::Instant::now();
533		}
534		match child.try_wait().into_diagnostic()? {
535			Some(status) => {
536				// Process has exited
537				debug!(exit_code = status.exit_code(), "psql process exited");
538				// On Windows, don't wait for reader thread as it may be blocked on PTY read
539				#[cfg(windows)]
540				{
541					*running.lock().unwrap() = false;
542					thread::sleep(Duration::from_millis(100));
543				}
544				#[cfg(not(windows))]
545				{
546					reader_thread.join().ok();
547				}
548				return Ok(status.exit_code() as i32);
549			}
550			None => {
551				// Process still running
552			}
553		}
554
555		// Check if reader thread is still running
556		if !*running.lock().unwrap() {
557			// Reader has stopped, process might have exited
558			thread::sleep(Duration::from_millis(50));
559			if let Some(status) = child.try_wait().into_diagnostic()? {
560				// Reader thread signaled it stopped, process may have exited
561				#[cfg(windows)]
562				{
563					thread::sleep(Duration::from_millis(100));
564				}
565				#[cfg(not(windows))]
566				{
567					reader_thread.join().ok();
568				}
569				return Ok(status.exit_code() as i32);
570			}
571		}
572
573		// Small delay to let output accumulate
574		thread::sleep(Duration::from_millis(50));
575
576		let at_prompt = psql_writer.buffer_contains(&format!("<<<{boundary}|||"));
577		if !at_prompt {
578			// Check if process has exited before forwarding stdin
579			if let Some(status) = child.try_wait().into_diagnostic()? {
580				debug!(
581					exit_code = status.exit_code(),
582					"psql process exited while not at prompt"
583				);
584				// On Windows, don't wait for reader thread as it may be blocked on PTY read
585				#[cfg(windows)]
586				{
587					// Signal the reader to stop
588					*running.lock().unwrap() = false;
589					// Give it a moment to finish, but don't wait indefinitely
590					thread::sleep(Duration::from_millis(100));
591				}
592				#[cfg(not(windows))]
593				{
594					reader_thread.join().ok();
595				}
596				return Ok(status.exit_code() as i32);
597			}
598
599			// Not at a prompt - could be in a pager or query is running
600			// Enable raw mode once and keep it active until we return to prompt
601			#[cfg(unix)]
602			if raw_mode.is_none() {
603				raw_mode = RawMode::enable();
604			}
605
606			// Forward stdin to PTY for pager interaction
607			forward_stdin_to_pty(&psql_writer);
608			thread::sleep(Duration::from_millis(50));
609			continue;
610		}
611
612		// We're at a prompt - disable raw mode if it was enabled
613		#[cfg(unix)]
614		if raw_mode.is_some() {
615			raw_mode = None; // Drop will restore terminal
616		}
617
618		// Use the formatted prompt for readline
619		let prompt_text = current_prompt.lock().unwrap().clone();
620		let readline_prompt = if prompt_text.is_empty() {
621			"psql> ".to_string()
622		} else {
623			prompt_text
624		};
625
626		match rl.readline(&readline_prompt) {
627			Ok(line) => {
628				trace!("received input line");
629				let trimmed = line.trim();
630				if trimmed == "\\e" || trimmed.starts_with("\\e ") {
631					debug!("editor command intercepted");
632
633					// Get the initial content - either from argument or from history
634					let initial_content = if trimmed == "\\e" {
635						// Get the last command from history
636						let hist_len = rl.history().len();
637						if hist_len > 0 {
638							match rl.history().get(hist_len - 1, SearchDirection::Forward) {
639								Ok(Some(result)) => result.entry.to_string(),
640								_ => String::new(),
641							}
642						} else {
643							String::new()
644						}
645					} else {
646						// User provided content after \e
647						trimmed
648							.strip_prefix("\\e ")
649							.unwrap_or("")
650							.trim()
651							.to_string()
652					};
653
654					// Open editor with the content
655					match edit::edit(&initial_content) {
656						Ok(edited_content) => {
657							let edited_trimmed = edited_content.trim();
658
659							// Only send if content is not empty
660							if !edited_trimmed.is_empty() {
661								info!("sending edited content to psql");
662
663								// Add to history
664								if let Err(e) = rl.history_mut().add(&edited_content) {
665									warn!("failed to add history entry: {}", e);
666								} else {
667									debug!("wrote history entry before sending to psql");
668								}
669
670								// Store the input so we can filter out the echo
671								*last_input.lock().unwrap() = format!("{}\n", edited_content);
672
673								// Send to psql
674								if let Err(e) = psql_writer.write_line(&edited_content) {
675									warn!("failed to write to psql: {}", e);
676									return Err(PsqlError::WriteError).into_diagnostic();
677								}
678							} else {
679								debug!("editor returned empty content, skipping");
680							}
681						}
682						Err(e) => {
683							warn!("editor failed: {}", e);
684							eprintln!("Editor failed: {}", e);
685						}
686					}
687					continue;
688				}
689
690				if trimmed == "\\refresh" {
691					let prompt_info = current_prompt_info.lock().unwrap().clone();
692					if let Some(ref info) = prompt_info {
693						if info.in_transaction() {
694							eprintln!("Cannot refresh schema cache while in a transaction. Please COMMIT or ROLLBACK first.");
695							continue;
696						}
697					}
698
699					if let Some(ref cache_manager) = schema_cache_manager {
700						info!("refreshing schema cache...");
701						eprintln!("Refreshing schema cache...");
702						match cache_manager.refresh() {
703							Ok(()) => {
704								eprintln!("Schema cache refreshed successfully");
705							}
706							Err(e) => {
707								warn!("failed to refresh schema cache: {}", e);
708								eprintln!("Failed to refresh schema cache: {}", e);
709							}
710						}
711					} else {
712						eprintln!("Schema cache is not enabled");
713					}
714					continue;
715				}
716
717				if trimmed == "\\W" {
718					let prompt_info = current_prompt_info.lock().unwrap().clone();
719					if let Some(ref info) = prompt_info {
720						if info.in_transaction() && info.transaction == "*" {
721							warn!("Pending transaction! Please COMMIT or ROLLBACK first");
722							continue;
723						}
724					}
725
726					let mut current_write_mode = write_mode_clone.lock().unwrap();
727					let mut current_ots = ots_clone.lock().unwrap();
728
729					if *current_write_mode {
730						*current_write_mode = false;
731						*current_ots = None;
732
733						#[cfg(windows)]
734						let cmd = "SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;\r\n\\set AUTOCOMMIT on\r\nROLLBACK;\r\n";
735						#[cfg(not(windows))]
736						let cmd = "SET SESSION CHARACTERISTICS AS TRANSACTION READ ONLY;\n\\set AUTOCOMMIT on\nROLLBACK;\n";
737						if let Err(e) = psql_writer.write_str(cmd) {
738							warn!("failed to write to psql: {}", e);
739							continue;
740						}
741
742						thread::sleep(Duration::from_millis(50));
743						info!("Write mode disabled");
744						thread::sleep(Duration::from_millis(5));
745						eprintln!("SESSION IS NOW READ ONLY");
746
747						let db_user = rl.history().db_user.clone();
748						let sys_user = rl.history().sys_user.clone();
749						rl.history_mut().set_context(db_user, sys_user, false, None);
750					} else {
751						drop(current_write_mode);
752						drop(current_ots);
753
754						let db_handle = rl.history().clone_db();
755						match ots::prompt_for_ots_with_db(Some(db_handle), Some(&history_path)) {
756							Ok(new_ots) => {
757								let mut current_write_mode = write_mode_clone.lock().unwrap();
758								let mut current_ots = ots_clone.lock().unwrap();
759
760								*current_write_mode = true;
761								*current_ots = Some(new_ots.clone());
762
763								#[cfg(windows)]
764								let cmd = "SET SESSION CHARACTERISTICS AS TRANSACTION READ WRITE;\r\n\\set AUTOCOMMIT off\r\nROLLBACK;\r\n";
765								#[cfg(not(windows))]
766								let cmd = "SET SESSION CHARACTERISTICS AS TRANSACTION READ WRITE;\n\\set AUTOCOMMIT off\nROLLBACK;\n";
767								if let Err(e) = psql_writer.write_str(cmd) {
768									warn!("failed to write to psql: {}", e);
769									continue;
770								}
771
772								thread::sleep(Duration::from_millis(50));
773								info!("Write mode enabled");
774								thread::sleep(Duration::from_millis(5));
775								eprintln!("AUTOCOMMIT IS OFF -- REMEMBER TO `COMMIT;` YOUR WRITES");
776
777								let db_user = rl.history().db_user.clone();
778								let sys_user = rl.history().sys_user.clone();
779								rl.history_mut().set_context(
780									db_user,
781									sys_user,
782									true,
783									Some(new_ots),
784								);
785							}
786							Err(e) => {
787								eprintln!("Failed to enable write mode: {}", e);
788							}
789						}
790					}
791					continue;
792				}
793
794				if !line.trim().is_empty() {
795					if let Err(e) = rl.history_mut().add(&line) {
796						warn!("failed to add history entry: {}", e);
797					} else {
798						debug!("wrote history entry before sending to psql");
799					}
800				}
801
802				// Store the input so we can filter out the echo
803				*last_input.lock().unwrap() = format!("{}\n", line);
804
805				if let Err(e) = psql_writer.write_line(&line) {
806					warn!("failed to write to psql: {}", e);
807					return Err(PsqlError::WriteError).into_diagnostic();
808				}
809			}
810			Err(rustyline::error::ReadlineError::Interrupted) => {
811				debug!("received Ctrl-C");
812				psql_writer.send_control(3).ok(); // ASCII ETX (Ctrl-C)
813			}
814			Err(rustyline::error::ReadlineError::Eof) => {
815				debug!("received Ctrl-D (EOF)");
816				#[cfg(windows)]
817				{
818					// On Windows, send \q command instead of Ctrl-D as it's more reliable
819					psql_writer.write_line("\\q").ok();
820				}
821				#[cfg(not(windows))]
822				{
823					psql_writer.send_control(4).ok(); // ASCII EOT (Ctrl-D)
824				}
825				break;
826			}
827			Err(err) => {
828				return Err(err).into_diagnostic();
829			}
830		}
831	}
832
833	// On Windows, don't wait for reader thread as it may be blocked on PTY read
834	#[cfg(windows)]
835	{
836		*running.lock().unwrap() = false;
837		thread::sleep(Duration::from_millis(100));
838	}
839	#[cfg(not(windows))]
840	{
841		reader_thread.join().ok();
842	}
843
844	// On Windows, give the process a chance to exit gracefully, but force kill if needed
845	#[cfg(windows)]
846	let status = {
847		use std::time::Duration;
848
849		// Wait up to 2 seconds for graceful exit
850		let mut attempts = 0;
851		loop {
852			if let Some(status) = child.try_wait().into_diagnostic()? {
853				break status;
854			}
855			if attempts >= 20 {
856				// After 2 seconds, force kill with Ctrl-C
857				debug!("process didn't exit gracefully, sending Ctrl-C");
858				psql_writer.send_control(3).ok();
859				thread::sleep(Duration::from_millis(500));
860				if let Some(status) = child.try_wait().into_diagnostic()? {
861					break status;
862				}
863				// If still not dead, wait indefinitely
864				break child.wait().into_diagnostic()?;
865			}
866			thread::sleep(Duration::from_millis(100));
867			attempts += 1;
868		}
869	};
870
871	#[cfg(not(windows))]
872	let status = child.wait().into_diagnostic()?;
873
874	debug!("compacting history database");
875	if let Err(e) = rl.history_mut().compact() {
876		warn!("failed to compact history database: {}", e);
877	}
878
879	debug!(exit_code = status.exit_code(), "exiting");
880	Ok(status.exit_code() as i32)
881}
882
883/// Run psql in passthrough mode (no rustyline wrapper)
884///
885/// Read-only mode is enforced.
886fn run_passthrough(mut config: PsqlConfig) -> Result<i32> {
887	// explicitly cannot do writes without the protections of the wrapper
888	config.write = false;
889
890	let (mut cmd, _guard) = config.std_command(None, false)?;
891	let status = cmd.status().into_diagnostic()?;
892
893	Ok(status.code().unwrap_or(1))
894}