use super::predict::{
classify_user_input_source, extract_sql_safe_reason, extract_sql_vulnerable_source,
is_safe_django_orm_method, is_sql_sink_method, is_sqlalchemy_text_function,
is_unsafe_raw_django_orm_method, line_contains_sql_keyword, matches_route_handler_decorator,
matches_route_handler_name, matches_trust_boundary_name, Evidence, SqlApi, UserInputSource,
};
use crate::detectors::security::ast_helpers::{enclosing_python_function, node_text};
use tree_sitter::Node;
pub(super) struct PythonSqlSite<'a> {
pub call_node: Node<'a>,
pub api: SqlApi,
pub callee_label: String,
}
pub(super) fn collect_python_sql_sites<'a>(
module_root: Node<'a>,
source: &'a [u8],
) -> Vec<PythonSqlSite<'a>> {
let mut sites = Vec::new();
let mut stack: Vec<Node<'_>> = vec![module_root];
while let Some(node) = stack.pop() {
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
stack.push(child);
}
if node.kind() != "call" {
continue;
}
let Some(func) = node.child_by_field_name("function") else {
continue;
};
let func_text = node_text(func, source).unwrap_or("");
if func_text.is_empty() {
continue;
}
let kind = recognized_sql_call_kind(func_text);
if kind == RecognizedSqlCallKind::NotRecognized {
continue;
}
let api = classify_sql_shape(node, source, kind);
sites.push(PythonSqlSite {
call_node: node,
api,
callee_label: func_text.to_string(),
});
}
sites
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(super) enum RecognizedSqlCallKind {
GenericSink,
DjangoOrmSafe,
DjangoOrmRaw,
NotRecognized,
}
fn recognized_sql_call_kind(func_text: &str) -> RecognizedSqlCallKind {
let Some(method_name) = func_text.rsplit('.').next() else {
return RecognizedSqlCallKind::NotRecognized;
};
if method_name == func_text {
return RecognizedSqlCallKind::NotRecognized;
}
if func_text.contains(".objects.") {
if is_unsafe_raw_django_orm_method(method_name) {
return RecognizedSqlCallKind::DjangoOrmRaw;
}
if is_safe_django_orm_method(method_name) {
return RecognizedSqlCallKind::DjangoOrmSafe;
}
return RecognizedSqlCallKind::NotRecognized;
}
if is_sql_sink_method(method_name) {
return RecognizedSqlCallKind::GenericSink;
}
RecognizedSqlCallKind::NotRecognized
}
fn classify_sql_shape(call_node: Node<'_>, source: &[u8], kind: RecognizedSqlCallKind) -> SqlApi {
let Some(args) = call_node.child_by_field_name("arguments") else {
return SqlApi::Ambiguous;
};
let positional = positional_args(args);
match kind {
RecognizedSqlCallKind::DjangoOrmSafe => SqlApi::Safe,
RecognizedSqlCallKind::DjangoOrmRaw => {
let Some(first) = positional.first() else {
return SqlApi::Ambiguous;
};
if first_arg_is_formatted_sql(*first, source) {
SqlApi::UnsafeRaw
} else {
SqlApi::Ambiguous
}
}
RecognizedSqlCallKind::GenericSink => {
let Some(first) = positional.first() else {
return SqlApi::Ambiguous;
};
if first_arg_is_formatted_sql(*first, source) {
return SqlApi::Unsafe;
}
if positional.len() >= 2 && is_string_literal(*first) {
return SqlApi::Safe;
}
if positional.len() >= 2
&& is_sqlalchemy_text_call_with_literal(*first, source)
&& positional
.get(1)
.map(|n| n.kind() == "dictionary")
.unwrap_or(false)
{
return SqlApi::Safe;
}
if positional.len() == 1 && is_string_literal(*first) {
return SqlApi::Safe;
}
SqlApi::Ambiguous
}
RecognizedSqlCallKind::NotRecognized => SqlApi::Ambiguous,
}
}
fn positional_args<'a>(args: Node<'a>) -> Vec<Node<'a>> {
let mut out = Vec::new();
let mut cursor = args.walk();
for child in args.children(&mut cursor) {
if !child.is_named() {
continue;
}
if child.kind() == "keyword_argument" {
continue;
}
out.push(child);
}
out
}
fn is_string_literal(arg: Node<'_>) -> bool {
if arg.kind() != "string" {
return false;
}
!string_is_fstring(arg)
}
fn string_is_fstring(arg: Node<'_>) -> bool {
if arg.kind() != "string" {
return false;
}
let mut cursor = arg.walk();
for child in arg.children(&mut cursor) {
if child.kind() == "interpolation" {
return true;
}
}
let start = arg.start_byte();
let end = arg.end_byte();
if start >= end {
return false;
}
false
}
fn first_arg_is_formatted_sql(arg: Node<'_>, source: &[u8]) -> bool {
if arg.kind() == "string" {
if string_is_fstring(arg) {
let text = node_text(arg, source).unwrap_or("");
return line_contains_sql_keyword(text);
}
let text = node_text(arg, source).unwrap_or("");
let lower = text.to_lowercase();
if (lower.starts_with("f\"") || lower.starts_with("f'") || lower.starts_with("rf"))
&& line_contains_sql_keyword(text)
{
return true;
}
return false;
}
if arg.kind() == "binary_operator" {
let op = arg
.child_by_field_name("operator")
.map(|n| node_text(n, source).unwrap_or(""))
.unwrap_or("");
if op == "+" && binary_op_contains_string_literal_with_sql(arg, source) {
return true;
}
if op == "%" && binary_op_contains_string_literal_with_sql(arg, source) {
return true;
}
}
if arg.kind() == "call" {
if let Some(func) = arg.child_by_field_name("function") {
let func_text = node_text(func, source).unwrap_or("");
if func_text.ends_with(".format") {
if let Some(receiver_text) = func_text.strip_suffix(".format") {
if let Some(object) = func.child_by_field_name("object") {
if object.kind() == "string" {
let s = node_text(object, source).unwrap_or("");
if line_contains_sql_keyword(s) {
return true;
}
}
}
if line_contains_sql_keyword(receiver_text) {
return true;
}
}
}
}
}
false
}
fn binary_op_contains_string_literal_with_sql(arg: Node<'_>, source: &[u8]) -> bool {
let mut cursor = arg.walk();
for child in arg.children(&mut cursor) {
if child.kind() == "string" {
let text = node_text(child, source).unwrap_or("");
if line_contains_sql_keyword(text) {
return true;
}
}
if child.kind() == "binary_operator"
&& binary_op_contains_string_literal_with_sql(child, source)
{
return true;
}
}
false
}
fn is_sqlalchemy_text_call_with_literal(arg: Node<'_>, source: &[u8]) -> bool {
if arg.kind() != "call" {
return false;
}
let Some(func) = arg.child_by_field_name("function") else {
return false;
};
let func_text = node_text(func, source).unwrap_or("");
let last = func_text.rsplit('.').next().unwrap_or("");
if !is_sqlalchemy_text_function(last) {
return false;
}
let Some(args) = arg.child_by_field_name("arguments") else {
return false;
};
let positional = positional_args(args);
let Some(first) = positional.first() else {
return false;
};
is_string_literal(*first)
}
pub(super) fn extract_python_evidence<'a>(
call_node: Node<'a>,
_module_root: Node<'a>,
source: &'a [u8],
lines: &[&str],
file_path: Option<String>,
api: SqlApi,
callee_label: String,
) -> Evidence {
let mut ev = Evidence {
file_path,
api: Some(api),
callee_label: Some(callee_label),
..Default::default()
};
let fn_node = enclosing_python_function(call_node);
if let Some(fn_node) = fn_node {
if let Some(name_node) = fn_node.child_by_field_name("name") {
if let Some(name) = node_text(name_node, source) {
ev.enclosing_function = Some(name.to_string());
}
}
}
ev.enclosing_class = enclosing_python_class_name(call_node, source);
if let Some(fn_name) = &ev.enclosing_function {
ev.trust_boundary_name = matches_trust_boundary_name(fn_name);
if matches_route_handler_name(fn_name) {
ev.enclosing_route_handler = true;
}
}
if !ev.enclosing_route_handler {
if let Some(fn_node) = fn_node {
if function_has_route_decorator(fn_node, lines) {
ev.enclosing_route_handler = true;
}
}
}
let call_line = call_node.start_position().row;
ev.user_input_source = classify_nearby_user_input(lines, call_line, 10);
if let Some(args) = call_node.child_by_field_name("arguments") {
let positional = positional_args(args);
if let Some(first) = positional.first() {
let is_literal = is_string_literal(*first);
let is_formatted = first_arg_is_formatted_sql(*first, source);
if is_literal && !is_formatted {
ev.static_sql_string_literal = true;
}
if !is_formatted {
if let Some(line) = lines.get(call_line) {
if line_contains_sql_keyword(line) {
ev.sql_keyword_no_formatting = true;
}
}
}
}
}
if let Some(line) = lines.get(call_line) {
ev.sql_safe_annotation = extract_sql_safe_reason(line);
ev.sql_vulnerable_annotation = extract_sql_vulnerable_source(line);
}
ev
}
fn classify_nearby_user_input(lines: &[&str], call_line: usize, radius: usize) -> UserInputSource {
let start = call_line.saturating_sub(radius);
let end = (call_line + radius + 1).min(lines.len());
for line in &lines[start..end] {
if matches!(
classify_user_input_source(line),
UserInputSource::UnstructuredJson
) {
return UserInputSource::UnstructuredJson;
}
}
for line in &lines[start..end] {
if matches!(
classify_user_input_source(line),
UserInputSource::TypedString
) {
return UserInputSource::TypedString;
}
}
UserInputSource::None
}
fn function_has_route_decorator(fn_node: Node<'_>, lines: &[&str]) -> bool {
let mut parent = fn_node.parent();
while let Some(p) = parent {
if p.kind() == "decorated_definition" {
let mut cursor = p.walk();
for child in p.children(&mut cursor) {
if child.kind() == "decorator" {
let line_idx = child.start_position().row;
if let Some(line) = lines.get(line_idx) {
if matches_route_handler_decorator(line) {
return true;
}
}
}
}
break;
}
if p.kind() == "module" {
break;
}
parent = p.parent();
}
false
}
fn enclosing_python_class_name<'a>(node: Node<'a>, source: &'a [u8]) -> Option<String> {
let mut cur = node.parent()?;
loop {
if cur.kind() == "class_definition" {
let name = cur.child_by_field_name("name")?;
return node_text(name, source).map(str::to_string);
}
if cur.kind() == "module" {
return None;
}
cur = cur.parent()?;
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::detectors::ast_fingerprint::parse_root_ext;
use crate::parsers::lightweight::Language;
fn first_call_with_attr<'tree>(
tree: &'tree tree_sitter::Tree,
source: &[u8],
attr_name: &str,
) -> tree_sitter::Node<'tree> {
fn walk<'a>(
node: tree_sitter::Node<'a>,
source: &[u8],
attr_name: &str,
) -> Option<tree_sitter::Node<'a>> {
if node.kind() == "call" {
if let Some(func) = node.child_by_field_name("function") {
let text = node_text(func, source).unwrap_or("");
let last = text.rsplit('.').next().unwrap_or("");
if last == attr_name {
return Some(node);
}
}
}
let mut cursor = node.walk();
for child in node.children(&mut cursor) {
if let Some(found) = walk(child, source, attr_name) {
return Some(found);
}
}
None
}
walk(tree.root_node(), source, attr_name)
.unwrap_or_else(|| panic!("no call to {} found in source", attr_name))
}
fn extract(src: &str, attr: &str) -> Evidence {
let tree = parse_root_ext(src, Language::Python, "py").expect("parse python");
let root = tree.root_node();
let call = first_call_with_attr(&tree, src.as_bytes(), attr);
let lines: Vec<&str> = src.lines().collect();
let func_text = call
.child_by_field_name("function")
.and_then(|f| node_text(f, src.as_bytes()))
.unwrap_or("")
.to_string();
let kind = recognized_sql_call_kind(&func_text);
let api = classify_sql_shape(call, src.as_bytes(), kind);
extract_python_evidence(call, root, src.as_bytes(), &lines, None, api, func_text)
}
fn collect_sites(src: &str) -> Vec<(SqlApi, String)> {
let tree = parse_root_ext(src, Language::Python, "py").expect("parse python");
let root = tree.root_node();
collect_python_sql_sites(root, src.as_bytes())
.into_iter()
.map(|s| (s.api, s.callee_label))
.collect()
}
#[test]
fn collect_picks_up_cursor_execute() {
let sites = collect_sites("cursor.execute(\"SELECT 1\")\n");
assert_eq!(sites.len(), 1);
assert_eq!(sites[0].1, "cursor.execute");
}
#[test]
fn collect_picks_up_django_orm_filter() {
let sites = collect_sites("User.objects.filter(id=1)\n");
assert_eq!(sites.len(), 1);
assert_eq!(sites[0].1, "User.objects.filter");
assert_eq!(sites[0].0, SqlApi::Safe);
}
#[test]
fn collect_picks_up_django_orm_raw() {
let sites = collect_sites("User.objects.raw(\"SELECT * FROM users\")\n");
assert_eq!(sites.len(), 1);
assert_eq!(sites[0].1, "User.objects.raw");
}
#[test]
fn collect_skips_bare_execute_call() {
let sites = collect_sites("execute(\"SELECT 1\")\n");
assert!(sites.is_empty());
}
#[test]
fn collect_skips_non_sql_method() {
let sites = collect_sites("user.greet()\n");
assert!(sites.is_empty());
}
#[test]
fn recognized_kind_generic_execute() {
assert_eq!(
recognized_sql_call_kind("cursor.execute"),
RecognizedSqlCallKind::GenericSink
);
}
#[test]
fn recognized_kind_orm_safe() {
assert_eq!(
recognized_sql_call_kind("User.objects.filter"),
RecognizedSqlCallKind::DjangoOrmSafe
);
assert_eq!(
recognized_sql_call_kind("Article.objects.get"),
RecognizedSqlCallKind::DjangoOrmSafe
);
}
#[test]
fn recognized_kind_orm_raw() {
assert_eq!(
recognized_sql_call_kind("User.objects.raw"),
RecognizedSqlCallKind::DjangoOrmRaw
);
assert_eq!(
recognized_sql_call_kind("Article.objects.extra"),
RecognizedSqlCallKind::DjangoOrmRaw
);
}
#[test]
fn recognized_kind_unknown_falls_through() {
assert_eq!(
recognized_sql_call_kind("foo.bar.unknown"),
RecognizedSqlCallKind::NotRecognized
);
assert_eq!(
recognized_sql_call_kind("bare_call"),
RecognizedSqlCallKind::NotRecognized
);
}
#[test]
fn classify_parameterized_execute_with_tuple() {
let src = "cursor.execute(\"SELECT * FROM u WHERE id = %s\", (uid,))\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::Safe);
}
#[test]
fn classify_parameterized_execute_with_list() {
let src = "cursor.execute(\"SELECT * FROM u WHERE id = ?\", [uid])\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::Safe);
}
#[test]
fn classify_django_orm_filter_safe() {
let src = "User.objects.filter(id=request.GET['id'])\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::Safe);
}
#[test]
fn classify_django_orm_get_safe() {
let src = "User.objects.get(id=1)\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::Safe);
}
#[test]
fn classify_sqlalchemy_text_with_bound_dict_safe() {
let src = "db.execute(text(\"SELECT :id\"), {\"id\": uid})\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::Safe);
}
#[test]
fn classify_static_literal_single_arg_safe() {
let src = "cursor.execute(\"INSERT INTO log VALUES ('static')\")\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::Safe);
}
#[test]
fn classify_fstring_execute_unsafe() {
let src = "cursor.execute(f\"SELECT * FROM u WHERE id = {uid}\")\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::Unsafe);
}
#[test]
fn classify_concat_execute_unsafe() {
let src = "cursor.execute(\"SELECT * FROM u WHERE id = \" + uid)\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::Unsafe);
}
#[test]
fn classify_format_execute_unsafe() {
let src = "cursor.execute(\"SELECT * FROM u WHERE id = {}\".format(uid))\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::Unsafe);
}
#[test]
fn classify_percent_execute_unsafe() {
let src = "cursor.execute(\"SELECT * FROM u WHERE id = %s\" % uid)\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::Unsafe);
}
#[test]
fn classify_django_raw_concat_unsafe_raw() {
let src = "User.objects.raw(\"SELECT * FROM u WHERE id = \" + uid)\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::UnsafeRaw);
}
#[test]
fn classify_django_raw_fstring_unsafe_raw() {
let src = "User.objects.raw(f\"SELECT * FROM u WHERE id = {uid}\")\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::UnsafeRaw);
}
#[test]
fn classify_django_raw_format_unsafe_raw() {
let src = "User.objects.raw(\"SELECT * FROM u WHERE id = {}\".format(uid))\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::UnsafeRaw);
}
#[test]
fn classify_django_raw_static_literal_ambiguous() {
let src = "User.objects.raw(\"SELECT * FROM u\")\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::Ambiguous);
}
#[test]
fn classify_opaque_variable_arg_ambiguous() {
let src = "cursor.execute(query)\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::Ambiguous);
}
#[test]
fn classify_function_call_arg_ambiguous() {
let src = "cursor.execute(build_query())\n";
let sites = collect_sites(src);
assert_eq!(sites[0].0, SqlApi::Ambiguous);
}
#[test]
fn detects_enclosing_function_name() {
let src = "\
def get_user(user_id):\n\
\x20 return cursor.execute(\"SELECT * FROM u WHERE id = %s\", (user_id,))\n";
let ev = extract(src, "execute");
assert_eq!(ev.enclosing_function.as_deref(), Some("get_user"));
}
#[test]
fn detects_enclosing_class_name() {
let src = "\
class UserRepo:\n\
\x20 def get(self, uid):\n\
\x20 return cursor.execute(\"SELECT * FROM u\")\n";
let ev = extract(src, "execute");
assert_eq!(ev.enclosing_class.as_deref(), Some("UserRepo"));
}
#[test]
fn detects_flask_route_decorator() {
let src = "\
from flask import request\n\
@app.route('/u', methods=['POST'])\n\
def login():\n\
\x20 return cursor.execute(f\"SELECT * FROM u WHERE n = {request.form['n']}\")\n";
let ev = extract(src, "execute");
assert!(ev.enclosing_route_handler);
}
#[test]
fn detects_handler_name() {
let src = "\
def login_handler():\n\
\x20 return cursor.execute(\"SELECT 1\")\n";
let ev = extract(src, "execute");
assert!(ev.enclosing_route_handler);
}
#[test]
fn detects_unstructured_json_source() {
let src = "\
def f():\n\
\x20 body = request.json\n\
\x20 return cursor.execute(\"SELECT * WHERE x = %s\", (body['x'],))\n";
let ev = extract(src, "execute");
assert_eq!(ev.user_input_source, UserInputSource::UnstructuredJson);
}
#[test]
fn detects_typed_string_source() {
let src = "\
def f():\n\
\x20 n = request.form['name']\n\
\x20 return cursor.execute(\"SELECT * WHERE n = %s\", (n,))\n";
let ev = extract(src, "execute");
assert_eq!(ev.user_input_source, UserInputSource::TypedString);
}
#[test]
fn detects_no_user_input_source() {
let src = "\
def f():\n\
\x20 return cursor.execute(\"SELECT 1\")\n";
let ev = extract(src, "execute");
assert_eq!(ev.user_input_source, UserInputSource::None);
}
#[test]
fn detects_static_sql_string_literal_signal() {
let src = "cursor.execute(\"SELECT 1\")\n";
let ev = extract(src, "execute");
assert!(ev.static_sql_string_literal);
}
#[test]
fn detects_sql_keyword_no_formatting_signal() {
let src = "cursor.execute(\"SELECT 1\")\n";
let ev = extract(src, "execute");
assert!(ev.sql_keyword_no_formatting);
}
#[test]
fn does_not_detect_static_literal_when_fstring() {
let src = "cursor.execute(f\"SELECT * WHERE x = {x}\")\n";
let ev = extract(src, "execute");
assert!(!ev.static_sql_string_literal);
}
#[test]
fn extracts_sql_safe_annotation() {
let src = "\
def f():\n\
\x20 return cursor.execute(q) # repotoire: sql-safe[whitelisted-table]\n";
let ev = extract(src, "execute");
assert_eq!(ev.sql_safe_annotation.as_deref(), Some("whitelisted-table"));
}
#[test]
fn extracts_sql_vulnerable_annotation() {
let src = "\
def f():\n\
\x20 return cursor.execute(q) # repotoire: sql-vulnerable[helper-built]\n";
let ev = extract(src, "execute");
assert_eq!(
ev.sql_vulnerable_annotation.as_deref(),
Some("helper-built")
);
}
#[test]
fn does_not_extract_unrelated_annotation() {
let src = "\
def f():\n\
\x20 return cursor.execute(\"SELECT 1\") # repotoire: jwt-safe[ok]\n";
let ev = extract(src, "execute");
assert!(ev.sql_safe_annotation.is_none());
assert!(ev.sql_vulnerable_annotation.is_none());
}
#[test]
fn case_a_full_evidence_parameterized_execute() {
let src = "\
from flask import request\n\
@app.route('/u', methods=['POST'])\n\
def get_user():\n\
\x20 return cursor.execute(\"SELECT * FROM u WHERE id = %s\", (request.form['id'],))\n";
let ev = extract(src, "execute");
assert_eq!(ev.api, Some(SqlApi::Safe));
assert!(ev.enclosing_route_handler);
}
#[test]
fn case_b_full_evidence_fstring_execute() {
let src = "\
from flask import request\n\
def get_user():\n\
\x20 return cursor.execute(f\"SELECT * FROM u WHERE id = {request.form['id']}\")\n";
let ev = extract(src, "execute");
assert_eq!(ev.api, Some(SqlApi::Unsafe));
assert_eq!(ev.user_input_source, UserInputSource::TypedString);
}
#[test]
fn case_c_full_evidence_type_cast_laundered_format() {
let src = "\
def f():\n\
\x20 return cursor.execute(\"SELECT * WHERE id = {}\".format(int(request.form['id'])))\n";
let ev = extract(src, "execute");
assert_eq!(ev.api, Some(SqlApi::Unsafe));
}
#[test]
fn case_d_full_evidence_django_orm_filter() {
let src = "\
from flask import request\n\
def list_users():\n\
\x20 return User.objects.filter(id=request.GET['id'])\n";
let ev = extract(src, "filter");
assert_eq!(ev.api, Some(SqlApi::Safe));
}
#[test]
fn case_e_full_evidence_django_raw_with_concat() {
let src = "\
from flask import request\n\
def list_users():\n\
\x20 return User.objects.raw(\"SELECT * FROM u WHERE id = \" + request.GET['id'])\n";
let ev = extract(src, "raw");
assert_eq!(ev.api, Some(SqlApi::UnsafeRaw));
}
#[test]
fn case_f_full_evidence_static_literal_insert() {
let src = "\
def f():\n\
\x20 return cursor.execute(\"INSERT INTO log VALUES ('static')\")\n";
let ev = extract(src, "execute");
assert_eq!(ev.api, Some(SqlApi::Safe));
assert!(ev.static_sql_string_literal);
}
#[test]
fn case_g_full_evidence_opaque_variable() {
let src = "\
from flask import request\n\
def get_user():\n\
\x20 q = build_query(request.form)\n\
\x20 return cursor.execute(q)\n";
let ev = extract(src, "execute");
assert_eq!(ev.api, Some(SqlApi::Ambiguous));
assert_eq!(ev.user_input_source, UserInputSource::TypedString);
}
#[test]
fn case_h_full_evidence_sqlalchemy_text_with_binds() {
let src = "\
from flask import request\n\
def f():\n\
\x20 return db.execute(text(\"SELECT :id\"), {\"id\": request.form['id']})\n";
let ev = extract(src, "execute");
assert_eq!(ev.api, Some(SqlApi::Safe));
}
}