def _quote_name(name):
return '"{0}"'.format(name.replace('"', '""'))
def _quote_value(value):
return "'{0}'".format(value.replace("'", "''"))
def _iterdump(connection, *, filter=None):
writeable_schema = False
cu = connection.cursor()
cu.row_factory = None violations = cu.execute("PRAGMA foreign_key_check").fetchall()
if violations:
yield('PRAGMA foreign_keys=OFF;')
yield('BEGIN TRANSACTION;')
if filter:
filter_name_clause = 'AND "name" LIKE ?'
params = [filter]
else:
filter_name_clause = ""
params = []
q = f"""
SELECT "name", "type", "sql"
FROM "sqlite_master"
WHERE "sql" NOT NULL AND
"type" == 'table'
{filter_name_clause}
ORDER BY "name"
"""
schema_res = cu.execute(q, params)
sqlite_sequence = []
for table_name, type, sql in schema_res.fetchall():
if table_name == 'sqlite_sequence':
rows = cu.execute('SELECT * FROM "sqlite_sequence";')
sqlite_sequence = ['DELETE FROM "sqlite_sequence"']
sqlite_sequence += [
f'INSERT INTO "sqlite_sequence" VALUES({_quote_value(table_name)},{seq_value})'
for table_name, seq_value in rows.fetchall()
]
continue
elif table_name == 'sqlite_stat1':
yield('ANALYZE "sqlite_master";')
elif table_name.startswith('sqlite_'):
continue
elif sql.startswith('CREATE VIRTUAL TABLE'):
if not writeable_schema:
writeable_schema = True
yield('PRAGMA writable_schema=ON;')
yield("INSERT INTO sqlite_master(type,name,tbl_name,rootpage,sql)"
"VALUES('table',{0},{0},0,{1});".format(
_quote_value(table_name),
_quote_value(sql),
))
else:
yield('{0};'.format(sql))
table_name_ident = _quote_name(table_name)
res = cu.execute(f'PRAGMA table_info({table_name_ident})')
column_names = [str(table_info[1]) for table_info in res.fetchall()]
q = "SELECT 'INSERT INTO {0} VALUES('{1}')' FROM {0};".format(
table_name_ident,
"','".join(
"||quote({0})||".format(_quote_name(col)) for col in column_names
)
)
query_res = cu.execute(q)
for row in query_res:
yield("{0};".format(row[0]))
q = f"""
SELECT "name", "type", "sql"
FROM "sqlite_master"
WHERE "sql" NOT NULL AND
"type" IN ('index', 'trigger', 'view')
{filter_name_clause}
"""
schema_res = cu.execute(q, params)
for name, type, sql in schema_res.fetchall():
yield('{0};'.format(sql))
if writeable_schema:
yield('PRAGMA writable_schema=OFF;')
for row in sqlite_sequence:
yield('{0};'.format(row))
yield('COMMIT;')