import sqlite3
import sys
from argparse import ArgumentParser
from code import InteractiveConsole
from textwrap import dedent
def execute(c, sql, suppress_errors=True):
try:
for row in c.execute(sql):
print(row)
except sqlite3.Error as e:
tp = type(e).__name__
try:
print(f"{tp} ({e.sqlite_errorname}): {e}", file=sys.stderr)
except AttributeError:
print(f"{tp}: {e}", file=sys.stderr)
if not suppress_errors:
sys.exit(1)
class SqliteInteractiveConsole(InteractiveConsole):
def __init__(self, connection):
super().__init__()
self._con = connection
self._cur = connection.cursor()
def runsource(self, source, filename="<input>", symbol="single"):
if not source or source.isspace():
return False
if source[0] == ".":
match source[1:].strip():
case "version":
print(f"{sqlite3.sqlite_version}")
case "help":
print("Enter SQL code and press enter.")
case "quit":
sys.exit(0)
case "":
pass
case _ as unknown:
self.write("Error: unknown command or invalid arguments:"
f' "{unknown}".\n')
else:
if not sqlite3.complete_statement(source):
return True
execute(self._cur, source)
return False
def main(*args):
parser = ArgumentParser(
description="Python sqlite3 CLI",
color=True,
)
parser.add_argument(
"filename", type=str, default=":memory:", nargs="?",
help=(
"SQLite database to open (defaults to ':memory:'). "
"A new database is created if the file does not previously exist."
),
)
parser.add_argument(
"sql", type=str, nargs="?",
help=(
"An SQL query to execute. "
"Any returned rows are printed to stdout."
),
)
parser.add_argument(
"-v", "--version", action="version",
version=f"SQLite version {sqlite3.sqlite_version}",
help="Print underlying SQLite library version",
)
args = parser.parse_args(*args)
if args.filename == ":memory:":
db_name = "a transient in-memory database"
else:
db_name = repr(args.filename)
if sys.platform == "win32" and "idlelib.run" not in sys.modules:
eofkey = "CTRL-Z"
else:
eofkey = "CTRL-D"
banner = dedent(f"""
sqlite3 shell, running on SQLite version {sqlite3.sqlite_version}
Connected to {db_name}
Each command will be run using execute() on the cursor.
Type ".help" for more information; type ".quit" or {eofkey} to quit.
""").strip()
sys.ps1 = "sqlite> "
sys.ps2 = " ... "
con = sqlite3.connect(args.filename, isolation_level=None)
try:
if args.sql:
execute(con, args.sql, suppress_errors=False)
else:
console = SqliteInteractiveConsole(con)
try:
import readline except ImportError:
pass
console.interact(banner, exitmsg="")
finally:
con.close()
sys.exit(0)
if __name__ == "__main__":
main(sys.argv[1:])